Module diem.jsonrpc.client
Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
import time
import copy
import dataclasses
import google.protobuf.json_format as parser
import requests
import threading
import typing
import random
from diem.__VERSION__ import VERSION
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
from logging import Logger, getLogger
from .. import diem_types, utils
from . import jsonrpc_pb2 as rpc
from . import constants
DEFAULT_CONNECT_TIMEOUT_SECS: float = 5.0
DEFAULT_TIMEOUT_SECS: float = 30.0
DEFAULT_MAX_RETRIES: int = 15
DEFAULT_RETRY_DELAY: float = 0.2
DEFAULT_WAIT_FOR_TRANSACTION_TIMEOUT_SECS: float = 30.0
DEFAULT_WAIT_FOR_TRANSACTION_WAIT_DURATION_SECS: float = 0.2
USER_AGENT_HTTP_HEADER: str = "diem-client-sdk-python / %s" % VERSION
class JsonRpcError(Exception):
pass
class NetworkError(Exception):
pass
class InvalidServerResponse(Exception):
pass
class StaleResponseError(Exception):
pass
class TransactionHashMismatchError(Exception):
pass
class TransactionExecutionFailed(Exception):
pass
class TransactionExpired(Exception):
pass
class WaitForTransactionTimeout(Exception):
pass
class AccountNotFoundError(ValueError):
pass
@dataclasses.dataclass
class State:
chain_id: int
version: int
timestamp_usecs: int
@dataclasses.dataclass
class Retry:
max_retries: int
delay_secs: float
exception: typing.Type[Exception]
def execute(self, fn: typing.Callable): # pyre-ignore
tries = 0
while tries < self.max_retries:
tries += 1
try:
return fn()
except self.exception as e:
if tries < self.max_retries:
# simplest backoff strategy: tries * delay
time.sleep(self.delay_secs * tries)
else:
raise e
class RequestStrategy:
"""RequestStrategy base class
It implements the simplest strategy: direct send http request
"""
def send_request(
self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool
) -> typing.Dict[str, typing.Any]:
return client._send_http_request(client._url, request, ignore_stale_response)
class RequestWithBackups(RequestStrategy):
"""RequestWithBackups implements strategies for primary-backup model.
First we send same request to primary and one of random picked backup urls in parallel.
Then we have 2 different strategies for how we handle responses:
1. first success: return first completed success response.
2. fallback: wait for primary response completed, if it failed, fallback to backup response.
Default is first success strategy, passing fallback=True in constructor to enable fallback strategy.
Errors cause failures:
1. http request error
2. http response error
3. response body is not json
4. StaleResponseError: this is included for making sure we always prefer to pick non-stale response.
Initialize Client:
```python
from concurrent.futures import ThreadPoolExecutor
from diem import jsonrpc
# This controls how many concurrent requests we can sent. It is shared for all jsonrpc.Client requests.
executor = ThreadPoolExecutor(5)
jsonrpc.Client(
<primary-json-rpc-server-url>
rs=jsonrpc.RequestWithBackups(backups=[<backup-json-rpc-server-url>...], executor=executor),
)
```
"""
def __init__(
self,
backups: typing.List[str],
executor: ThreadPoolExecutor,
fallback: bool = False,
) -> None:
self._backups = backups
self._executor = executor
self._fallback = fallback
def send_request(
self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool
) -> typing.Dict[str, typing.Any]:
primary = self._executor.submit(client._send_http_request, client._url, request, ignore_stale_response)
backup = self._executor.submit(
client._send_http_request, random.choice(self._backups), request, ignore_stale_response
)
if self._fallback:
return self._fallback_to_backup(primary, backup)
return self._first_success(primary, backup)
def _fallback_to_backup(self, primary: Future, backup: Future) -> typing.Dict[str, typing.Any]:
try:
return primary.result()
except Exception:
return backup.result()
def _first_success(self, primary: Future, backup: Future) -> typing.Dict[str, typing.Any]:
futures = as_completed({primary, backup})
first = next(futures)
try:
return first.result()
except Exception:
return next(futures).result()
class Client:
"""Diem JSON-RPC API client
[SPEC](https://github.com/diem/diem/blob/master/json-rpc/json-rpc-spec.md)
"""
def __init__(
self,
server_url: str,
session: typing.Optional[requests.Session] = None,
timeout: typing.Optional[typing.Tuple[float, float]] = None,
retry: typing.Optional[Retry] = None,
rs: typing.Optional[RequestStrategy] = None,
logger: typing.Optional[Logger] = None,
) -> None:
self._url: str = server_url
self._session: requests.Session = session or requests.Session()
self._session.headers.update({"User-Agent": USER_AGENT_HTTP_HEADER})
self._timeout: typing.Tuple[float, float] = timeout or (DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_TIMEOUT_SECS)
self._last_known_server_state: State = State(chain_id=-1, version=-1, timestamp_usecs=-1)
self._lock = threading.Lock()
self._retry: Retry = retry or Retry(DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY, StaleResponseError)
self._rs: RequestStrategy = rs or RequestStrategy()
self._logger: Logger = logger or getLogger(__name__)
# high level functions
def get_parent_vasp_account(
self, vasp_account_address: typing.Union[diem_types.AccountAddress, str]
) -> rpc.Account:
"""get parent_vasp account
accepts child/parent vasp account address, returns parent vasp account
raise ValueError if given account address is not ChildVASP or ParentVASP account
address
raise AccountNotFoundError if no account found by given account address, or
could not find the account by the parent_vasp_address found in ChildVASP account.
"""
account = self.must_get_account(vasp_account_address)
if account.role.type == constants.ACCOUNT_ROLE_PARENT_VASP:
return account
if account.role.type == constants.ACCOUNT_ROLE_CHILD_VASP:
return self.get_parent_vasp_account(account.role.parent_vasp_address)
hex = utils.account_address_hex(vasp_account_address)
raise ValueError(f"given account address({hex}) is not a VASP account: {account}")
def get_base_url_and_compliance_key(
self, account_address: typing.Union[diem_types.AccountAddress, str]
) -> typing.Tuple[str, Ed25519PublicKey]:
"""get base_url and compliance key
ParentVASP or Designated Dealer account role has base_url and compliance key setup, which
are used for offchain API communication.
"""
account = self.must_get_account(account_address)
if account.role.compliance_key and account.role.base_url:
key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(account.role.compliance_key))
return (account.role.base_url, key)
if account.role.parent_vasp_address:
return self.get_base_url_and_compliance_key(account.role.parent_vasp_address)
raise ValueError(f"could not find base_url and compliance_key from account: {account}")
def must_get_account(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> rpc.Account:
"""must_get_account raises AccountNotFoundError if account could not be found by given address"""
account = self.get_account(account_address)
if account is None:
hex = utils.account_address_hex(account_address)
raise AccountNotFoundError(f"account not found by address: {hex}")
return account
def get_account_sequence(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> int:
"""get on-chain account sequence number
Calls get_account to find on-chain account information and return it's sequence.
Raises AccountNotFoundError if get_account returns None
"""
account = self.get_account(account_address)
if account is None:
hex = utils.account_address_hex(account_address)
raise AccountNotFoundError(f"account not found by address: {hex}")
return int(account.sequence_number)
# low level functions
def get_last_known_state(self) -> State:
"""get last known server state
All JSON-RPC service response contains chain_id, latest ledger state version and
ledger state timestamp usecs.
Returns a state with all -1 values if the client never called server after initialized.
Last known state is used for tracking server response, making sure we won't hit stale
server.
"""
with self._lock:
return copy.copy(self._last_known_server_state)
def update_last_known_state(self, chain_id: int, version: int, timestamp_usecs: int) -> None:
"""update last known server state
Raises InvalidServerResponse if given chain_id mismatches with previous value
Raises StaleResponseError if version or timestamp_usecs is less than previous values
"""
with self._lock:
curr = self._last_known_server_state
if curr.chain_id != -1 and curr.chain_id != chain_id:
raise InvalidServerResponse(f"last known chain id {curr.chain_id}, " f"but got {chain_id}")
if curr.version > version:
raise StaleResponseError(f"last known version {curr.version} > {version}")
if curr.timestamp_usecs > timestamp_usecs:
raise StaleResponseError(f"last known timestamp_usecs {curr.timestamp_usecs} > {timestamp_usecs}")
self._last_known_server_state = State(
chain_id=chain_id,
version=version,
timestamp_usecs=timestamp_usecs,
)
def get_metadata(
self,
version: typing.Optional[int] = None,
) -> rpc.Metadata:
"""get block metadata
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_metadata.md)
"""
params = [int(version)] if version else []
return self.execute("get_metadata", params, _parse_obj(lambda: rpc.Metadata()))
def get_currencies(self) -> typing.List[rpc.CurrencyInfo]:
"""get currencies
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_currencies.md)
"""
return self.execute("get_currencies", [], _parse_list(lambda: rpc.CurrencyInfo()))
def get_account(
self, account_address: typing.Union[diem_types.AccountAddress, str]
) -> typing.Optional[rpc.Account]:
"""get on-chain account information
Returns None if account not found
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account.md)
"""
address = utils.account_address_hex(account_address)
return self.execute("get_account", [address], _parse_obj(lambda: rpc.Account()))
def get_account_transaction(
self,
account_address: typing.Union[diem_types.AccountAddress, str],
sequence: int,
include_events: typing.Optional[bool] = None,
) -> typing.Optional[rpc.Transaction]:
"""get on-chain account transaction by sequence number
Returns None if transaction is not found
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transaction.md)
"""
address = utils.account_address_hex(account_address)
params = [address, int(sequence), bool(include_events)]
return self.execute("get_account_transaction", params, _parse_obj(lambda: rpc.Transaction()))
def get_account_transactions(
self,
account_address: typing.Union[diem_types.AccountAddress, str],
sequence: int,
limit: int,
include_events: typing.Optional[bool] = None,
) -> typing.List[rpc.Transaction]:
"""get on-chain account transactions by start sequence number and limit size
Returns empty list if no transactions found
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transactions.md)
"""
address = utils.account_address_hex(account_address)
params = [address, int(sequence), int(limit), bool(include_events)]
return self.execute("get_account_transactions", params, _parse_list(lambda: rpc.Transaction()))
def get_transactions(
self,
start_version: int,
limit: int,
include_events: typing.Optional[bool] = None,
) -> typing.List[rpc.Transaction]:
"""get transactions
Returns empty list if no transactions found
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_transactions.md)
"""
params = [int(start_version), int(limit), bool(include_events)]
return self.execute("get_transactions", params, _parse_list(lambda: rpc.Transaction()))
def get_events(self, event_stream_key: str, start: int, limit: int) -> typing.List[rpc.Event]:
"""get events
Returns empty list if no events found
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_events.md)
"""
params = [event_stream_key, int(start), int(limit)]
return self.execute("get_events", params, _parse_list(lambda: rpc.Event()))
def get_state_proof(self, version: int) -> rpc.StateProof:
params = [int(version)]
return self.execute("get_state_proof", params, _parse_obj(lambda: rpc.StateProof()))
def get_account_state_with_proof(
self,
account_address: diem_types.AccountAddress,
version: typing.Optional[int] = None,
ledger_version: typing.Optional[int] = None,
) -> rpc.AccountStateWithProof:
address = utils.account_address_hex(account_address)
params = [address, version, ledger_version]
return self.execute("get_account_state_with_proof", params, _parse_obj(lambda: rpc.AccountStateWithProof()))
def submit(
self,
txn: typing.Union[diem_types.SignedTransaction, str],
) -> None:
"""submit signed transaction
See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_submit.md)
This method ignores StaleResponseError and does not retry on any submit errors, because re-submit any transaction
may get JsonRpcError `SEQUENCE_NUMBER_TOO_OLD` in multi-threads environment, for example:
1. thread-1: submit transaction.
2. server: receive submit transaction. The latest ledger version is X.
3. server: receive a new state sync. The latest ledger version becomes X+1.
4. thread-2: get_events (can be any get API).
5. server: receive get_events. The latest ledger version is X+1.
6. server: respond to get_events with latest ledger version == X+1.
7. thread-2: receive get_events response.
8. thread-2: update latest known ledger version X+1.
9. server: respond to submit transaction with latest ledger version == X.
10. thread-1: receive submit transaction response. Found response ledger version X < known version X+1.
11. thread-1: triggers StaleResponseError.
12. if we retry on StaleResponseError, the submitted transaction may end with JsonRpcError `SEQUENCE_NUMBER_TOO_OLD`
if the following events happened.
13. server: execute transaction, and the transaction sender account sequence number +1.
14. thread-1: submit the transaction again.
15. server: validate transaction sender account sequence number, and response SEQUENCE_NUMBER_TOO_OLD error.
However, the transaction was executed successfully, thus raising SEQUENCE_NUMBER_TOO_OLD error may cause
a re-submit with new account sequence number if client handled it improperly (without checking whether
the transaction is executed).
"""
if isinstance(txn, diem_types.SignedTransaction):
return self.submit(txn.bcs_serialize().hex())
self.execute_without_retry("submit", [txn], result_parser=None, ignore_stale_response=True)
def wait_for_transaction(
self, txn: typing.Union[diem_types.SignedTransaction, str], timeout_secs: typing.Optional[float] = None
) -> rpc.Transaction:
"""wait for transaction executed
Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found.
Raises TransactionExpired if server responses new block timestamp is after signed transaction
expiration_timestamp_secs.
Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result)
is not success.
Raises TransactionHashMismatchError if found transaction by account address and sequence
number, but the transaction hash does not match the transactoin hash given in parameter.
This means the executed transaction is from another process (which submitted transaction
with same account address and sequence).
"""
if isinstance(txn, str):
txn_obj = diem_types.SignedTransaction.bcs_deserialize(bytes.fromhex(txn))
return self.wait_for_transaction(txn_obj, timeout_secs)
return self.wait_for_transaction2(
txn.raw_txn.sender,
txn.raw_txn.sequence_number,
txn.raw_txn.expiration_timestamp_secs,
utils.transaction_hash(txn),
timeout_secs,
)
def wait_for_transaction2(
self,
address: diem_types.AccountAddress,
seq: int,
expiration_time_secs: int,
txn_hash: str,
timeout_secs: typing.Optional[float] = None,
wait_duration_secs: typing.Optional[float] = None,
) -> rpc.Transaction:
"""wait for transaction executed
Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found.
Raises TransactionExpired if server responses new block timestamp is after signed transaction
expiration_timestamp_secs.
Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result)
is not success.
Raises TransactionHashMismatchError if found transaction by account address and sequence
number, but the transaction hash does not match the transactoin hash given in parameter.
This means the executed transaction is from another process (which submitted transaction
with same account address and sequence).
"""
max_wait = time.time() + (timeout_secs or DEFAULT_WAIT_FOR_TRANSACTION_TIMEOUT_SECS)
while time.time() < max_wait:
txn = self.get_account_transaction(address, seq, True)
if txn is not None:
if txn.hash != txn_hash:
raise TransactionHashMismatchError(f"expected hash {txn_hash}, but got {txn.hash}")
if txn.vm_status.type != constants.VM_STATUS_EXECUTED:
raise TransactionExecutionFailed(f"VM status: {txn.vm_status}")
return txn
state = self.get_last_known_state()
if expiration_time_secs * 1_000_000 <= state.timestamp_usecs:
raise TransactionExpired(
f"latest server ledger timestamp_usecs {state.timestamp_usecs}, "
f"transaction expires at {expiration_time_secs}"
)
time.sleep(wait_duration_secs or DEFAULT_WAIT_FOR_TRANSACTION_WAIT_DURATION_SECS)
raise WaitForTransactionTimeout()
# pyre-ignore
def execute(
self,
method: str,
params: typing.List[typing.Any], # pyre-ignore
result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore
ignore_stale_response: typing.Optional[bool] = None,
):
"""execute JSON-RPC method call
This method handles StableResponseError with retry.
Should only be called by get methods.
"""
return self._retry.execute(
lambda: self.execute_without_retry(method, params, result_parser, ignore_stale_response)
)
# pyre-ignore
def execute_without_retry(
self,
method: str,
params: typing.List[typing.Any], # pyre-ignore
result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore
ignore_stale_response: typing.Optional[bool] = None,
):
"""execute JSON-RPC method call without retry any error.
Raises InvalidServerResponse if server response does not match
[JSON-RPC SPEC 2.0](https://www.jsonrpc.org/specification), or response result can't be parsed.
Raises StaleResponseError if ignore_stale_response is True, otherwise ignores it and continue.
Raises JsonRpcError if server JSON-RPC response with error object.
Raises NetworkError if send http request failed, or received server response status is not 200.
"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or [],
}
try:
json = self._rs.send_request(self, request, ignore_stale_response or False)
if "error" in json:
err = json["error"]
raise JsonRpcError(f"{err}")
if "result" in json:
if result_parser:
return result_parser(json["result"])
return
raise InvalidServerResponse(f"No error or result in response: {json}")
except requests.RequestException as e:
raise NetworkError(f"Error in connecting to server: {e}\nPlease retry...")
except parser.ParseError as e:
raise InvalidServerResponse(f"Parse result failed: {e}, response: {json}")
def _send_http_request(
self,
url: str,
request: typing.Dict[str, typing.Any],
ignore_stale_response: bool,
) -> typing.Dict[str, typing.Any]:
self._logger.debug("http request body: %s", request)
response = self._session.post(url, json=request, timeout=self._timeout)
self._logger.debug("http response body: %s", response.text)
response.raise_for_status()
try:
json = response.json()
except ValueError as e:
raise InvalidServerResponse(f"Parse response as json failed: {e}, response: {response.text}")
# check stable response before check jsonrpc error
try:
self.update_last_known_state(
json.get("diem_chain_id"),
json.get("diem_ledger_version"),
json.get("diem_ledger_timestampusec"),
)
except StaleResponseError as e:
if not ignore_stale_response:
raise e
return json
def _parse_obj(factory): # pyre-ignore
return lambda result: parser.ParseDict(result, factory(), ignore_unknown_fields=True) if result else None
def _parse_list(factory): # pyre-ignore
parser = _parse_obj(factory)
return lambda result: list(map(parser, result)) if result else []
Classes
class AccountNotFoundError (*args, **kwargs)
-
Inappropriate argument value (of correct type).
Expand source code
class AccountNotFoundError(ValueError): pass
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class Client (server_url: str, session: Optional[requests.sessions.Session] = None, timeout: Optional[Tuple[float, float]] = None, retry: Optional[Retry] = None, rs: Optional[RequestStrategy] = None, logger: Optional[logging.Logger] = None)
-
Diem JSON-RPC API client
Expand source code
class Client: """Diem JSON-RPC API client [SPEC](https://github.com/diem/diem/blob/master/json-rpc/json-rpc-spec.md) """ def __init__( self, server_url: str, session: typing.Optional[requests.Session] = None, timeout: typing.Optional[typing.Tuple[float, float]] = None, retry: typing.Optional[Retry] = None, rs: typing.Optional[RequestStrategy] = None, logger: typing.Optional[Logger] = None, ) -> None: self._url: str = server_url self._session: requests.Session = session or requests.Session() self._session.headers.update({"User-Agent": USER_AGENT_HTTP_HEADER}) self._timeout: typing.Tuple[float, float] = timeout or (DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_TIMEOUT_SECS) self._last_known_server_state: State = State(chain_id=-1, version=-1, timestamp_usecs=-1) self._lock = threading.Lock() self._retry: Retry = retry or Retry(DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY, StaleResponseError) self._rs: RequestStrategy = rs or RequestStrategy() self._logger: Logger = logger or getLogger(__name__) # high level functions def get_parent_vasp_account( self, vasp_account_address: typing.Union[diem_types.AccountAddress, str] ) -> rpc.Account: """get parent_vasp account accepts child/parent vasp account address, returns parent vasp account raise ValueError if given account address is not ChildVASP or ParentVASP account address raise AccountNotFoundError if no account found by given account address, or could not find the account by the parent_vasp_address found in ChildVASP account. """ account = self.must_get_account(vasp_account_address) if account.role.type == constants.ACCOUNT_ROLE_PARENT_VASP: return account if account.role.type == constants.ACCOUNT_ROLE_CHILD_VASP: return self.get_parent_vasp_account(account.role.parent_vasp_address) hex = utils.account_address_hex(vasp_account_address) raise ValueError(f"given account address({hex}) is not a VASP account: {account}") def get_base_url_and_compliance_key( self, account_address: typing.Union[diem_types.AccountAddress, str] ) -> typing.Tuple[str, Ed25519PublicKey]: """get base_url and compliance key ParentVASP or Designated Dealer account role has base_url and compliance key setup, which are used for offchain API communication. """ account = self.must_get_account(account_address) if account.role.compliance_key and account.role.base_url: key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(account.role.compliance_key)) return (account.role.base_url, key) if account.role.parent_vasp_address: return self.get_base_url_and_compliance_key(account.role.parent_vasp_address) raise ValueError(f"could not find base_url and compliance_key from account: {account}") def must_get_account(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> rpc.Account: """must_get_account raises AccountNotFoundError if account could not be found by given address""" account = self.get_account(account_address) if account is None: hex = utils.account_address_hex(account_address) raise AccountNotFoundError(f"account not found by address: {hex}") return account def get_account_sequence(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> int: """get on-chain account sequence number Calls get_account to find on-chain account information and return it's sequence. Raises AccountNotFoundError if get_account returns None """ account = self.get_account(account_address) if account is None: hex = utils.account_address_hex(account_address) raise AccountNotFoundError(f"account not found by address: {hex}") return int(account.sequence_number) # low level functions def get_last_known_state(self) -> State: """get last known server state All JSON-RPC service response contains chain_id, latest ledger state version and ledger state timestamp usecs. Returns a state with all -1 values if the client never called server after initialized. Last known state is used for tracking server response, making sure we won't hit stale server. """ with self._lock: return copy.copy(self._last_known_server_state) def update_last_known_state(self, chain_id: int, version: int, timestamp_usecs: int) -> None: """update last known server state Raises InvalidServerResponse if given chain_id mismatches with previous value Raises StaleResponseError if version or timestamp_usecs is less than previous values """ with self._lock: curr = self._last_known_server_state if curr.chain_id != -1 and curr.chain_id != chain_id: raise InvalidServerResponse(f"last known chain id {curr.chain_id}, " f"but got {chain_id}") if curr.version > version: raise StaleResponseError(f"last known version {curr.version} > {version}") if curr.timestamp_usecs > timestamp_usecs: raise StaleResponseError(f"last known timestamp_usecs {curr.timestamp_usecs} > {timestamp_usecs}") self._last_known_server_state = State( chain_id=chain_id, version=version, timestamp_usecs=timestamp_usecs, ) def get_metadata( self, version: typing.Optional[int] = None, ) -> rpc.Metadata: """get block metadata See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_metadata.md) """ params = [int(version)] if version else [] return self.execute("get_metadata", params, _parse_obj(lambda: rpc.Metadata())) def get_currencies(self) -> typing.List[rpc.CurrencyInfo]: """get currencies See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_currencies.md) """ return self.execute("get_currencies", [], _parse_list(lambda: rpc.CurrencyInfo())) def get_account( self, account_address: typing.Union[diem_types.AccountAddress, str] ) -> typing.Optional[rpc.Account]: """get on-chain account information Returns None if account not found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account.md) """ address = utils.account_address_hex(account_address) return self.execute("get_account", [address], _parse_obj(lambda: rpc.Account())) def get_account_transaction( self, account_address: typing.Union[diem_types.AccountAddress, str], sequence: int, include_events: typing.Optional[bool] = None, ) -> typing.Optional[rpc.Transaction]: """get on-chain account transaction by sequence number Returns None if transaction is not found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transaction.md) """ address = utils.account_address_hex(account_address) params = [address, int(sequence), bool(include_events)] return self.execute("get_account_transaction", params, _parse_obj(lambda: rpc.Transaction())) def get_account_transactions( self, account_address: typing.Union[diem_types.AccountAddress, str], sequence: int, limit: int, include_events: typing.Optional[bool] = None, ) -> typing.List[rpc.Transaction]: """get on-chain account transactions by start sequence number and limit size Returns empty list if no transactions found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transactions.md) """ address = utils.account_address_hex(account_address) params = [address, int(sequence), int(limit), bool(include_events)] return self.execute("get_account_transactions", params, _parse_list(lambda: rpc.Transaction())) def get_transactions( self, start_version: int, limit: int, include_events: typing.Optional[bool] = None, ) -> typing.List[rpc.Transaction]: """get transactions Returns empty list if no transactions found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_transactions.md) """ params = [int(start_version), int(limit), bool(include_events)] return self.execute("get_transactions", params, _parse_list(lambda: rpc.Transaction())) def get_events(self, event_stream_key: str, start: int, limit: int) -> typing.List[rpc.Event]: """get events Returns empty list if no events found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_events.md) """ params = [event_stream_key, int(start), int(limit)] return self.execute("get_events", params, _parse_list(lambda: rpc.Event())) def get_state_proof(self, version: int) -> rpc.StateProof: params = [int(version)] return self.execute("get_state_proof", params, _parse_obj(lambda: rpc.StateProof())) def get_account_state_with_proof( self, account_address: diem_types.AccountAddress, version: typing.Optional[int] = None, ledger_version: typing.Optional[int] = None, ) -> rpc.AccountStateWithProof: address = utils.account_address_hex(account_address) params = [address, version, ledger_version] return self.execute("get_account_state_with_proof", params, _parse_obj(lambda: rpc.AccountStateWithProof())) def submit( self, txn: typing.Union[diem_types.SignedTransaction, str], ) -> None: """submit signed transaction See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_submit.md) This method ignores StaleResponseError and does not retry on any submit errors, because re-submit any transaction may get JsonRpcError `SEQUENCE_NUMBER_TOO_OLD` in multi-threads environment, for example: 1. thread-1: submit transaction. 2. server: receive submit transaction. The latest ledger version is X. 3. server: receive a new state sync. The latest ledger version becomes X+1. 4. thread-2: get_events (can be any get API). 5. server: receive get_events. The latest ledger version is X+1. 6. server: respond to get_events with latest ledger version == X+1. 7. thread-2: receive get_events response. 8. thread-2: update latest known ledger version X+1. 9. server: respond to submit transaction with latest ledger version == X. 10. thread-1: receive submit transaction response. Found response ledger version X < known version X+1. 11. thread-1: triggers StaleResponseError. 12. if we retry on StaleResponseError, the submitted transaction may end with JsonRpcError `SEQUENCE_NUMBER_TOO_OLD` if the following events happened. 13. server: execute transaction, and the transaction sender account sequence number +1. 14. thread-1: submit the transaction again. 15. server: validate transaction sender account sequence number, and response SEQUENCE_NUMBER_TOO_OLD error. However, the transaction was executed successfully, thus raising SEQUENCE_NUMBER_TOO_OLD error may cause a re-submit with new account sequence number if client handled it improperly (without checking whether the transaction is executed). """ if isinstance(txn, diem_types.SignedTransaction): return self.submit(txn.bcs_serialize().hex()) self.execute_without_retry("submit", [txn], result_parser=None, ignore_stale_response=True) def wait_for_transaction( self, txn: typing.Union[diem_types.SignedTransaction, str], timeout_secs: typing.Optional[float] = None ) -> rpc.Transaction: """wait for transaction executed Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found. Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs. Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success. Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence). """ if isinstance(txn, str): txn_obj = diem_types.SignedTransaction.bcs_deserialize(bytes.fromhex(txn)) return self.wait_for_transaction(txn_obj, timeout_secs) return self.wait_for_transaction2( txn.raw_txn.sender, txn.raw_txn.sequence_number, txn.raw_txn.expiration_timestamp_secs, utils.transaction_hash(txn), timeout_secs, ) def wait_for_transaction2( self, address: diem_types.AccountAddress, seq: int, expiration_time_secs: int, txn_hash: str, timeout_secs: typing.Optional[float] = None, wait_duration_secs: typing.Optional[float] = None, ) -> rpc.Transaction: """wait for transaction executed Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found. Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs. Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success. Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence). """ max_wait = time.time() + (timeout_secs or DEFAULT_WAIT_FOR_TRANSACTION_TIMEOUT_SECS) while time.time() < max_wait: txn = self.get_account_transaction(address, seq, True) if txn is not None: if txn.hash != txn_hash: raise TransactionHashMismatchError(f"expected hash {txn_hash}, but got {txn.hash}") if txn.vm_status.type != constants.VM_STATUS_EXECUTED: raise TransactionExecutionFailed(f"VM status: {txn.vm_status}") return txn state = self.get_last_known_state() if expiration_time_secs * 1_000_000 <= state.timestamp_usecs: raise TransactionExpired( f"latest server ledger timestamp_usecs {state.timestamp_usecs}, " f"transaction expires at {expiration_time_secs}" ) time.sleep(wait_duration_secs or DEFAULT_WAIT_FOR_TRANSACTION_WAIT_DURATION_SECS) raise WaitForTransactionTimeout() # pyre-ignore def execute( self, method: str, params: typing.List[typing.Any], # pyre-ignore result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore ignore_stale_response: typing.Optional[bool] = None, ): """execute JSON-RPC method call This method handles StableResponseError with retry. Should only be called by get methods. """ return self._retry.execute( lambda: self.execute_without_retry(method, params, result_parser, ignore_stale_response) ) # pyre-ignore def execute_without_retry( self, method: str, params: typing.List[typing.Any], # pyre-ignore result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore ignore_stale_response: typing.Optional[bool] = None, ): """execute JSON-RPC method call without retry any error. Raises InvalidServerResponse if server response does not match [JSON-RPC SPEC 2.0](https://www.jsonrpc.org/specification), or response result can't be parsed. Raises StaleResponseError if ignore_stale_response is True, otherwise ignores it and continue. Raises JsonRpcError if server JSON-RPC response with error object. Raises NetworkError if send http request failed, or received server response status is not 200. """ request = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params or [], } try: json = self._rs.send_request(self, request, ignore_stale_response or False) if "error" in json: err = json["error"] raise JsonRpcError(f"{err}") if "result" in json: if result_parser: return result_parser(json["result"]) return raise InvalidServerResponse(f"No error or result in response: {json}") except requests.RequestException as e: raise NetworkError(f"Error in connecting to server: {e}\nPlease retry...") except parser.ParseError as e: raise InvalidServerResponse(f"Parse result failed: {e}, response: {json}") def _send_http_request( self, url: str, request: typing.Dict[str, typing.Any], ignore_stale_response: bool, ) -> typing.Dict[str, typing.Any]: self._logger.debug("http request body: %s", request) response = self._session.post(url, json=request, timeout=self._timeout) self._logger.debug("http response body: %s", response.text) response.raise_for_status() try: json = response.json() except ValueError as e: raise InvalidServerResponse(f"Parse response as json failed: {e}, response: {response.text}") # check stable response before check jsonrpc error try: self.update_last_known_state( json.get("diem_chain_id"), json.get("diem_ledger_version"), json.get("diem_ledger_timestampusec"), ) except StaleResponseError as e: if not ignore_stale_response: raise e return json
Methods
def execute(self, method: str, params: List[Any], result_parser: Optional[Callable] = None, ignore_stale_response: Optional[bool] = None)
-
execute JSON-RPC method call
This method handles StableResponseError with retry. Should only be called by get methods.
Expand source code
def execute( self, method: str, params: typing.List[typing.Any], # pyre-ignore result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore ignore_stale_response: typing.Optional[bool] = None, ): """execute JSON-RPC method call This method handles StableResponseError with retry. Should only be called by get methods. """ return self._retry.execute( lambda: self.execute_without_retry(method, params, result_parser, ignore_stale_response) )
def execute_without_retry(self, method: str, params: List[Any], result_parser: Optional[Callable] = None, ignore_stale_response: Optional[bool] = None)
-
execute JSON-RPC method call without retry any error.
Raises InvalidServerResponse if server response does not match JSON-RPC SPEC 2.0, or response result can't be parsed.
Raises StaleResponseError if ignore_stale_response is True, otherwise ignores it and continue.
Raises JsonRpcError if server JSON-RPC response with error object.
Raises NetworkError if send http request failed, or received server response status is not 200.
Expand source code
def execute_without_retry( self, method: str, params: typing.List[typing.Any], # pyre-ignore result_parser: typing.Optional[typing.Callable] = None, # pyre-ignore ignore_stale_response: typing.Optional[bool] = None, ): """execute JSON-RPC method call without retry any error. Raises InvalidServerResponse if server response does not match [JSON-RPC SPEC 2.0](https://www.jsonrpc.org/specification), or response result can't be parsed. Raises StaleResponseError if ignore_stale_response is True, otherwise ignores it and continue. Raises JsonRpcError if server JSON-RPC response with error object. Raises NetworkError if send http request failed, or received server response status is not 200. """ request = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params or [], } try: json = self._rs.send_request(self, request, ignore_stale_response or False) if "error" in json: err = json["error"] raise JsonRpcError(f"{err}") if "result" in json: if result_parser: return result_parser(json["result"]) return raise InvalidServerResponse(f"No error or result in response: {json}") except requests.RequestException as e: raise NetworkError(f"Error in connecting to server: {e}\nPlease retry...") except parser.ParseError as e: raise InvalidServerResponse(f"Parse result failed: {e}, response: {json}")
def get_account(self, account_address: Union[AccountAddress, str]) ‑> Optional[jsonrpc_pb2.Account]
-
get on-chain account information
Returns None if account not found See JSON-RPC API Doc
Expand source code
def get_account( self, account_address: typing.Union[diem_types.AccountAddress, str] ) -> typing.Optional[rpc.Account]: """get on-chain account information Returns None if account not found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account.md) """ address = utils.account_address_hex(account_address) return self.execute("get_account", [address], _parse_obj(lambda: rpc.Account()))
def get_account_sequence(self, account_address: Union[AccountAddress, str]) ‑> int
-
get on-chain account sequence number
Calls get_account to find on-chain account information and return it's sequence. Raises AccountNotFoundError if get_account returns None
Expand source code
def get_account_sequence(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> int: """get on-chain account sequence number Calls get_account to find on-chain account information and return it's sequence. Raises AccountNotFoundError if get_account returns None """ account = self.get_account(account_address) if account is None: hex = utils.account_address_hex(account_address) raise AccountNotFoundError(f"account not found by address: {hex}") return int(account.sequence_number)
def get_account_state_with_proof(self, account_address: AccountAddress, version: Optional[int] = None, ledger_version: Optional[int] = None) ‑> jsonrpc_pb2.AccountStateWithProof
-
Expand source code
def get_account_state_with_proof( self, account_address: diem_types.AccountAddress, version: typing.Optional[int] = None, ledger_version: typing.Optional[int] = None, ) -> rpc.AccountStateWithProof: address = utils.account_address_hex(account_address) params = [address, version, ledger_version] return self.execute("get_account_state_with_proof", params, _parse_obj(lambda: rpc.AccountStateWithProof()))
def get_account_transaction(self, account_address: Union[AccountAddress, str], sequence: int, include_events: Optional[bool] = None) ‑> Optional[jsonrpc_pb2.Transaction]
-
get on-chain account transaction by sequence number
Returns None if transaction is not found
See JSON-RPC API Doc
Expand source code
def get_account_transaction( self, account_address: typing.Union[diem_types.AccountAddress, str], sequence: int, include_events: typing.Optional[bool] = None, ) -> typing.Optional[rpc.Transaction]: """get on-chain account transaction by sequence number Returns None if transaction is not found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transaction.md) """ address = utils.account_address_hex(account_address) params = [address, int(sequence), bool(include_events)] return self.execute("get_account_transaction", params, _parse_obj(lambda: rpc.Transaction()))
def get_account_transactions(self, account_address: Union[AccountAddress, str], sequence: int, limit: int, include_events: Optional[bool] = None) ‑> List[jsonrpc_pb2.Transaction]
-
get on-chain account transactions by start sequence number and limit size
Returns empty list if no transactions found
See JSON-RPC API Doc
Expand source code
def get_account_transactions( self, account_address: typing.Union[diem_types.AccountAddress, str], sequence: int, limit: int, include_events: typing.Optional[bool] = None, ) -> typing.List[rpc.Transaction]: """get on-chain account transactions by start sequence number and limit size Returns empty list if no transactions found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_account_transactions.md) """ address = utils.account_address_hex(account_address) params = [address, int(sequence), int(limit), bool(include_events)] return self.execute("get_account_transactions", params, _parse_list(lambda: rpc.Transaction()))
def get_base_url_and_compliance_key(self, account_address: Union[AccountAddress, str]) ‑> Tuple[str, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey]
-
get base_url and compliance key
ParentVASP or Designated Dealer account role has base_url and compliance key setup, which are used for offchain API communication.
Expand source code
def get_base_url_and_compliance_key( self, account_address: typing.Union[diem_types.AccountAddress, str] ) -> typing.Tuple[str, Ed25519PublicKey]: """get base_url and compliance key ParentVASP or Designated Dealer account role has base_url and compliance key setup, which are used for offchain API communication. """ account = self.must_get_account(account_address) if account.role.compliance_key and account.role.base_url: key = Ed25519PublicKey.from_public_bytes(bytes.fromhex(account.role.compliance_key)) return (account.role.base_url, key) if account.role.parent_vasp_address: return self.get_base_url_and_compliance_key(account.role.parent_vasp_address) raise ValueError(f"could not find base_url and compliance_key from account: {account}")
def get_currencies(self) ‑> List[jsonrpc_pb2.CurrencyInfo]
-
get currencies
See JSON-RPC API Doc
Expand source code
def get_currencies(self) -> typing.List[rpc.CurrencyInfo]: """get currencies See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_currencies.md) """ return self.execute("get_currencies", [], _parse_list(lambda: rpc.CurrencyInfo()))
def get_events(self, event_stream_key: str, start: int, limit: int) ‑> List[jsonrpc_pb2.Event]
-
Expand source code
def get_events(self, event_stream_key: str, start: int, limit: int) -> typing.List[rpc.Event]: """get events Returns empty list if no events found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_events.md) """ params = [event_stream_key, int(start), int(limit)] return self.execute("get_events", params, _parse_list(lambda: rpc.Event()))
def get_last_known_state(self) ‑> State
-
get last known server state
All JSON-RPC service response contains chain_id, latest ledger state version and ledger state timestamp usecs. Returns a state with all -1 values if the client never called server after initialized. Last known state is used for tracking server response, making sure we won't hit stale server.
Expand source code
def get_last_known_state(self) -> State: """get last known server state All JSON-RPC service response contains chain_id, latest ledger state version and ledger state timestamp usecs. Returns a state with all -1 values if the client never called server after initialized. Last known state is used for tracking server response, making sure we won't hit stale server. """ with self._lock: return copy.copy(self._last_known_server_state)
def get_metadata(self, version: Optional[int] = None) ‑> jsonrpc_pb2.Metadata
-
get block metadata
See JSON-RPC API Doc
Expand source code
def get_metadata( self, version: typing.Optional[int] = None, ) -> rpc.Metadata: """get block metadata See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_metadata.md) """ params = [int(version)] if version else [] return self.execute("get_metadata", params, _parse_obj(lambda: rpc.Metadata()))
def get_parent_vasp_account(self, vasp_account_address: Union[AccountAddress, str]) ‑> jsonrpc_pb2.Account
-
get parent_vasp account
accepts child/parent vasp account address, returns parent vasp account
raise ValueError if given account address is not ChildVASP or ParentVASP account address raise AccountNotFoundError if no account found by given account address, or could not find the account by the parent_vasp_address found in ChildVASP account.
Expand source code
def get_parent_vasp_account( self, vasp_account_address: typing.Union[diem_types.AccountAddress, str] ) -> rpc.Account: """get parent_vasp account accepts child/parent vasp account address, returns parent vasp account raise ValueError if given account address is not ChildVASP or ParentVASP account address raise AccountNotFoundError if no account found by given account address, or could not find the account by the parent_vasp_address found in ChildVASP account. """ account = self.must_get_account(vasp_account_address) if account.role.type == constants.ACCOUNT_ROLE_PARENT_VASP: return account if account.role.type == constants.ACCOUNT_ROLE_CHILD_VASP: return self.get_parent_vasp_account(account.role.parent_vasp_address) hex = utils.account_address_hex(vasp_account_address) raise ValueError(f"given account address({hex}) is not a VASP account: {account}")
def get_state_proof(self, version: int) ‑> jsonrpc_pb2.StateProof
-
Expand source code
def get_state_proof(self, version: int) -> rpc.StateProof: params = [int(version)] return self.execute("get_state_proof", params, _parse_obj(lambda: rpc.StateProof()))
def get_transactions(self, start_version: int, limit: int, include_events: Optional[bool] = None) ‑> List[jsonrpc_pb2.Transaction]
-
Expand source code
def get_transactions( self, start_version: int, limit: int, include_events: typing.Optional[bool] = None, ) -> typing.List[rpc.Transaction]: """get transactions Returns empty list if no transactions found See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_get_transactions.md) """ params = [int(start_version), int(limit), bool(include_events)] return self.execute("get_transactions", params, _parse_list(lambda: rpc.Transaction()))
def must_get_account(self, account_address: Union[AccountAddress, str]) ‑> jsonrpc_pb2.Account
-
must_get_account raises AccountNotFoundError if account could not be found by given address
Expand source code
def must_get_account(self, account_address: typing.Union[diem_types.AccountAddress, str]) -> rpc.Account: """must_get_account raises AccountNotFoundError if account could not be found by given address""" account = self.get_account(account_address) if account is None: hex = utils.account_address_hex(account_address) raise AccountNotFoundError(f"account not found by address: {hex}") return account
def submit(self, txn: Union[SignedTransaction, str]) ‑> NoneType
-
submit signed transaction
See JSON-RPC API Doc
This method ignores StaleResponseError and does not retry on any submit errors, because re-submit any transaction may get JsonRpcError
SEQUENCE_NUMBER_TOO_OLD
in multi-threads environment, for example:- thread-1: submit transaction.
- server: receive submit transaction. The latest ledger version is X.
- server: receive a new state sync. The latest ledger version becomes X+1.
- thread-2: get_events (can be any get API).
- server: receive get_events. The latest ledger version is X+1.
- server: respond to get_events with latest ledger version == X+1.
- thread-2: receive get_events response.
- thread-2: update latest known ledger version X+1.
- server: respond to submit transaction with latest ledger version == X.
- thread-1: receive submit transaction response. Found response ledger version X < known version X+1.
- thread-1: triggers StaleResponseError.
- if we retry on StaleResponseError, the submitted transaction may end with JsonRpcError
SEQUENCE_NUMBER_TOO_OLD
if the following events happened. - server: execute transaction, and the transaction sender account sequence number +1.
- thread-1: submit the transaction again.
- server: validate transaction sender account sequence number, and response SEQUENCE_NUMBER_TOO_OLD error. However, the transaction was executed successfully, thus raising SEQUENCE_NUMBER_TOO_OLD error may cause a re-submit with new account sequence number if client handled it improperly (without checking whether the transaction is executed).
Expand source code
def submit( self, txn: typing.Union[diem_types.SignedTransaction, str], ) -> None: """submit signed transaction See [JSON-RPC API Doc](https://github.com/diem/diem/blob/master/json-rpc/docs/method_submit.md) This method ignores StaleResponseError and does not retry on any submit errors, because re-submit any transaction may get JsonRpcError `SEQUENCE_NUMBER_TOO_OLD` in multi-threads environment, for example: 1. thread-1: submit transaction. 2. server: receive submit transaction. The latest ledger version is X. 3. server: receive a new state sync. The latest ledger version becomes X+1. 4. thread-2: get_events (can be any get API). 5. server: receive get_events. The latest ledger version is X+1. 6. server: respond to get_events with latest ledger version == X+1. 7. thread-2: receive get_events response. 8. thread-2: update latest known ledger version X+1. 9. server: respond to submit transaction with latest ledger version == X. 10. thread-1: receive submit transaction response. Found response ledger version X < known version X+1. 11. thread-1: triggers StaleResponseError. 12. if we retry on StaleResponseError, the submitted transaction may end with JsonRpcError `SEQUENCE_NUMBER_TOO_OLD` if the following events happened. 13. server: execute transaction, and the transaction sender account sequence number +1. 14. thread-1: submit the transaction again. 15. server: validate transaction sender account sequence number, and response SEQUENCE_NUMBER_TOO_OLD error. However, the transaction was executed successfully, thus raising SEQUENCE_NUMBER_TOO_OLD error may cause a re-submit with new account sequence number if client handled it improperly (without checking whether the transaction is executed). """ if isinstance(txn, diem_types.SignedTransaction): return self.submit(txn.bcs_serialize().hex()) self.execute_without_retry("submit", [txn], result_parser=None, ignore_stale_response=True)
def update_last_known_state(self, chain_id: int, version: int, timestamp_usecs: int) ‑> NoneType
-
update last known server state
Raises InvalidServerResponse if given chain_id mismatches with previous value
Raises StaleResponseError if version or timestamp_usecs is less than previous values
Expand source code
def update_last_known_state(self, chain_id: int, version: int, timestamp_usecs: int) -> None: """update last known server state Raises InvalidServerResponse if given chain_id mismatches with previous value Raises StaleResponseError if version or timestamp_usecs is less than previous values """ with self._lock: curr = self._last_known_server_state if curr.chain_id != -1 and curr.chain_id != chain_id: raise InvalidServerResponse(f"last known chain id {curr.chain_id}, " f"but got {chain_id}") if curr.version > version: raise StaleResponseError(f"last known version {curr.version} > {version}") if curr.timestamp_usecs > timestamp_usecs: raise StaleResponseError(f"last known timestamp_usecs {curr.timestamp_usecs} > {timestamp_usecs}") self._last_known_server_state = State( chain_id=chain_id, version=version, timestamp_usecs=timestamp_usecs, )
def wait_for_transaction(self, txn: Union[SignedTransaction, str], timeout_secs: Optional[float] = None) ‑> jsonrpc_pb2.Transaction
-
wait for transaction executed
Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found.
Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs.
Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success.
Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence).
Expand source code
def wait_for_transaction( self, txn: typing.Union[diem_types.SignedTransaction, str], timeout_secs: typing.Optional[float] = None ) -> rpc.Transaction: """wait for transaction executed Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found. Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs. Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success. Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence). """ if isinstance(txn, str): txn_obj = diem_types.SignedTransaction.bcs_deserialize(bytes.fromhex(txn)) return self.wait_for_transaction(txn_obj, timeout_secs) return self.wait_for_transaction2( txn.raw_txn.sender, txn.raw_txn.sequence_number, txn.raw_txn.expiration_timestamp_secs, utils.transaction_hash(txn), timeout_secs, )
def wait_for_transaction2(self, address: AccountAddress, seq: int, expiration_time_secs: int, txn_hash: str, timeout_secs: Optional[float] = None, wait_duration_secs: Optional[float] = None) ‑> jsonrpc_pb2.Transaction
-
wait for transaction executed
Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found.
Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs.
Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success.
Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence).
Expand source code
def wait_for_transaction2( self, address: diem_types.AccountAddress, seq: int, expiration_time_secs: int, txn_hash: str, timeout_secs: typing.Optional[float] = None, wait_duration_secs: typing.Optional[float] = None, ) -> rpc.Transaction: """wait for transaction executed Raises WaitForTransactionTimeout if waited timeout_secs and no expected transaction found. Raises TransactionExpired if server responses new block timestamp is after signed transaction expiration_timestamp_secs. Raises TransactionExecutionFailed if found transaction and it's vm_status (execution result) is not success. Raises TransactionHashMismatchError if found transaction by account address and sequence number, but the transaction hash does not match the transactoin hash given in parameter. This means the executed transaction is from another process (which submitted transaction with same account address and sequence). """ max_wait = time.time() + (timeout_secs or DEFAULT_WAIT_FOR_TRANSACTION_TIMEOUT_SECS) while time.time() < max_wait: txn = self.get_account_transaction(address, seq, True) if txn is not None: if txn.hash != txn_hash: raise TransactionHashMismatchError(f"expected hash {txn_hash}, but got {txn.hash}") if txn.vm_status.type != constants.VM_STATUS_EXECUTED: raise TransactionExecutionFailed(f"VM status: {txn.vm_status}") return txn state = self.get_last_known_state() if expiration_time_secs * 1_000_000 <= state.timestamp_usecs: raise TransactionExpired( f"latest server ledger timestamp_usecs {state.timestamp_usecs}, " f"transaction expires at {expiration_time_secs}" ) time.sleep(wait_duration_secs or DEFAULT_WAIT_FOR_TRANSACTION_WAIT_DURATION_SECS) raise WaitForTransactionTimeout()
class InvalidServerResponse (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class InvalidServerResponse(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class JsonRpcError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class JsonRpcError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class NetworkError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class NetworkError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class RequestStrategy
-
RequestStrategy base class
It implements the simplest strategy: direct send http request
Expand source code
class RequestStrategy: """RequestStrategy base class It implements the simplest strategy: direct send http request """ def send_request( self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool ) -> typing.Dict[str, typing.Any]: return client._send_http_request(client._url, request, ignore_stale_response)
Subclasses
Methods
def send_request(self, client: Client, request: Dict[str, Any], ignore_stale_response: bool) ‑> Dict[str, Any]
-
Expand source code
def send_request( self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool ) -> typing.Dict[str, typing.Any]: return client._send_http_request(client._url, request, ignore_stale_response)
class RequestWithBackups (backups: List[str], executor: concurrent.futures.thread.ThreadPoolExecutor, fallback: bool = False)
-
RequestWithBackups implements strategies for primary-backup model.
First we send same request to primary and one of random picked backup urls in parallel. Then we have 2 different strategies for how we handle responses:
- first success: return first completed success response.
- fallback: wait for primary response completed, if it failed, fallback to backup response.
Default is first success strategy, passing fallback=True in constructor to enable fallback strategy.
Errors cause failures:
- http request error
- http response error
- response body is not json
- StaleResponseError: this is included for making sure we always prefer to pick non-stale response.
Initialize Client:
from concurrent.futures import ThreadPoolExecutor from diem import jsonrpc # This controls how many concurrent requests we can sent. It is shared for all jsonrpc.Client requests. executor = ThreadPoolExecutor(5) jsonrpc.Client( <primary-json-rpc-server-url> rs=jsonrpc.RequestWithBackups(backups=[<backup-json-rpc-server-url>...], executor=executor), )
Expand source code
class RequestWithBackups(RequestStrategy): """RequestWithBackups implements strategies for primary-backup model. First we send same request to primary and one of random picked backup urls in parallel. Then we have 2 different strategies for how we handle responses: 1. first success: return first completed success response. 2. fallback: wait for primary response completed, if it failed, fallback to backup response. Default is first success strategy, passing fallback=True in constructor to enable fallback strategy. Errors cause failures: 1. http request error 2. http response error 3. response body is not json 4. StaleResponseError: this is included for making sure we always prefer to pick non-stale response. Initialize Client: ```python from concurrent.futures import ThreadPoolExecutor from diem import jsonrpc # This controls how many concurrent requests we can sent. It is shared for all jsonrpc.Client requests. executor = ThreadPoolExecutor(5) jsonrpc.Client( <primary-json-rpc-server-url> rs=jsonrpc.RequestWithBackups(backups=[<backup-json-rpc-server-url>...], executor=executor), ) ``` """ def __init__( self, backups: typing.List[str], executor: ThreadPoolExecutor, fallback: bool = False, ) -> None: self._backups = backups self._executor = executor self._fallback = fallback def send_request( self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool ) -> typing.Dict[str, typing.Any]: primary = self._executor.submit(client._send_http_request, client._url, request, ignore_stale_response) backup = self._executor.submit( client._send_http_request, random.choice(self._backups), request, ignore_stale_response ) if self._fallback: return self._fallback_to_backup(primary, backup) return self._first_success(primary, backup) def _fallback_to_backup(self, primary: Future, backup: Future) -> typing.Dict[str, typing.Any]: try: return primary.result() except Exception: return backup.result() def _first_success(self, primary: Future, backup: Future) -> typing.Dict[str, typing.Any]: futures = as_completed({primary, backup}) first = next(futures) try: return first.result() except Exception: return next(futures).result()
Ancestors
Methods
def send_request(self, client: Client, request: Dict[str, Any], ignore_stale_response: bool) ‑> Dict[str, Any]
-
Expand source code
def send_request( self, client: "Client", request: typing.Dict[str, typing.Any], ignore_stale_response: bool ) -> typing.Dict[str, typing.Any]: primary = self._executor.submit(client._send_http_request, client._url, request, ignore_stale_response) backup = self._executor.submit( client._send_http_request, random.choice(self._backups), request, ignore_stale_response ) if self._fallback: return self._fallback_to_backup(primary, backup) return self._first_success(primary, backup)
class Retry (max_retries: int, delay_secs: float, exception: Type[Exception])
-
Retry(max_retries: int, delay_secs: float, exception: Type[Exception])
Expand source code
@dataclasses.dataclass class Retry: max_retries: int delay_secs: float exception: typing.Type[Exception] def execute(self, fn: typing.Callable): # pyre-ignore tries = 0 while tries < self.max_retries: tries += 1 try: return fn() except self.exception as e: if tries < self.max_retries: # simplest backoff strategy: tries * delay time.sleep(self.delay_secs * tries) else: raise e
Class variables
var delay_secs : float
var exception : Type[Exception]
var max_retries : int
Methods
def execute(self, fn: Callable)
-
Expand source code
def execute(self, fn: typing.Callable): # pyre-ignore tries = 0 while tries < self.max_retries: tries += 1 try: return fn() except self.exception as e: if tries < self.max_retries: # simplest backoff strategy: tries * delay time.sleep(self.delay_secs * tries) else: raise e
class StaleResponseError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class StaleResponseError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class State (chain_id: int, version: int, timestamp_usecs: int)
-
State(chain_id: int, version: int, timestamp_usecs: int)
Expand source code
@dataclasses.dataclass class State: chain_id: int version: int timestamp_usecs: int
Class variables
var chain_id : int
var timestamp_usecs : int
var version : int
class TransactionExecutionFailed (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class TransactionExecutionFailed(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class TransactionExpired (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class TransactionExpired(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class TransactionHashMismatchError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class TransactionHashMismatchError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class WaitForTransactionTimeout (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class WaitForTransactionTimeout(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException