Module diem.testing.miniwallet.client
Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass, field, asdict
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from typing import List, Optional, Any, Dict
from .app import KycSample, Event
from .app.store import _match
from ... import offchain, jsonrpc
import requests, logging, json, os
@dataclass
class Payment:
id: str
account_id: str
currency: str
amount: int
payee: str
@dataclass
class RestClient:
name: str
server_url: str
session: requests.Session = field(default_factory=requests.Session)
logger: logging.Logger = field(init=False)
events_api_is_optional: bool = field(default=False)
def __post_init__(self) -> None:
self.logger = logging.getLogger(self.name)
def with_retry(self, retry: Retry = Retry(total=5, connect=5, backoff_factor=0.01)) -> "RestClient":
self.session.mount(self.server_url, HTTPAdapter(max_retries=retry))
return self
def create_account(
self,
balances: Optional[Dict[str, int]] = None,
kyc_data: Optional[offchain.KycDataObject] = None,
reject_additional_kyc_data_request: Optional[bool] = None,
disable_background_tasks: Optional[bool] = None,
) -> "AccountResource":
kwargs = {
"balances": balances,
"kyc_data": asdict(kyc_data) if kyc_data else None,
"reject_additional_kyc_data_request": reject_additional_kyc_data_request,
"disable_background_tasks": disable_background_tasks,
}
account = self.create("/accounts", **{k: v for k, v in kwargs.items() if v})
return AccountResource(client=self, id=account["id"], kyc_data=kyc_data)
def get_kyc_sample(self) -> KycSample:
return offchain.from_dict(self.send("GET", "/kyc_sample").json(), KycSample)
def create(self, path: str, **kwargs: Any) -> Dict[str, Any]:
return self.send("POST", path, json.dumps(kwargs) if kwargs else None).json()
def get(self, path: str) -> Dict[str, Any]:
return self.send("GET", path).json()
def send(self, method: str, path: str, data: Optional[str] = None) -> requests.Response:
url = "%s/%s" % (self.server_url.rstrip("/"), path.lstrip("/"))
self.logger.debug("%s %s: %s", method, path, data)
headers = {
"Content-Type": "application/json",
"User-Agent": jsonrpc.client.USER_AGENT_HTTP_HEADER,
"X-Test-Case": os.getenv("PYTEST_CURRENT_TEST"),
}
resp = self.session.request(
method=method,
url=url.lower(),
data=data,
headers={k: v for k, v in headers.items() if v},
)
log_level = logging.DEBUG if resp.status_code < 300 else logging.ERROR
if self.events_api_is_optional and path.endswith("/events"):
log_level = logging.DEBUG
self.logger.log(log_level, "%s %s: %s - %s", method, path, data, resp.status_code)
self.logger.log(log_level, "response body: \n%s", try_json(resp.text))
resp.raise_for_status()
return resp
def try_json(text: str) -> str:
try:
obj = json.loads(text)
if isinstance(obj, dict):
# pretty print error json stacktrace info
return "\n".join(["%s: %s" % (k, v) for k, v in obj.items()])
return json.dumps(obj, indent=2)
except Exception:
return text
@dataclass
class AccountResource:
client: RestClient
id: str
kyc_data: Optional[offchain.KycDataObject] = field(default=None)
def balance(self, currency: str) -> int:
"""Get account balance for the given currency
Calls `GET /accounts/{account_id}/balances` endpoint and only return balance of the given currency.
Returns 0 if given currency does not exist in the returned balances.
"""
return self.balances().get(currency, 0)
def send_payment(self, currency: str, amount: int, payee: str) -> Payment:
"""Send amount of currency to payee
Calls `POST /accounts/{account_id}/payments` endpoint and returns payment details.
"""
p = self.client.create(self._resources("payment"), payee=payee, currency=currency, amount=amount)
return Payment(id=p["id"], account_id=self.id, payee=payee, currency=currency, amount=amount)
def generate_account_identifier(self) -> str:
"""Generate an account identifier
Calls `POST /accounts/{account_id}/account_identifiers` to generate account identifier.
"""
ret = self.client.create(self._resources("account_identifier"))
return ret["account_identifier"]
def events(self, start: int = 0) -> List[Event]:
"""Get account events
Calls to `GET /accounts/{account_id}/events` endpoint and returns events list.
Raises `requests.HTTPError`, if the endpoint is not implemented.
"""
ret = self.client.send("GET", self._resources("event")).json()
return [Event(**obj) for obj in ret[start:]]
def find_event(self, event_type: str, start_index: int = 0, **kwargs: Any) -> Optional[Event]:
"""Find a specific event by `type`, `start_index` and `data`
When matching the event `data`, it assumes `data` is JSON encoded dictionary, and
returns the event if the `**kwargs` is subset of the dictionary decoded from event `data` field.
"""
events = [e for e in self.events(start_index) if e.type == event_type]
for e in events:
if _match(json.loads(e.data), **kwargs):
return e
def log_events(self) -> None:
"""Log account events as INFO
Does nothing if get events API is not implemented.
"""
events = self.dump_events()
if events:
self.client.logger.info("account(%s) events: %s", self.id, events)
def dump_events(self) -> str:
"""Dump account events as JSON encoded string (well formatted, and indent=2)
Returns empty string if get events API is not implemented.
"""
try:
return json.dumps(list(map(self.event_asdict, self.events())), indent=2)
except requests.HTTPError:
return ""
def event_asdict(self, event: Event) -> Dict[str, Any]:
"""Returns `Event` as dictionary object.
As we use JSON-encoded string field, this function tries to decoding all JSON-encoded
string as dictionary for pretty print event data in log.
"""
ret = asdict(event)
try:
ret["data"] = json.loads(event.data)
except json.decoder.JSONDecodeError:
pass
return ret
def info(self, *args: Any, **kwargs: Any) -> None:
"""Log info to `client.logger`"""
self.client.logger.info(*args, **kwargs)
def balances(self) -> Dict[str, int]:
"""returns account balances object
should always prefer to use func `balance(currency) -> int`, which returns zero
when currency not exist in the response.
"""
return self.client.get(self._resources("balance"))
def _resources(self, resource: str) -> str:
return "/accounts/%s/%ss" % (self.id, resource)
Functions
def try_json(text: str) ‑> str
-
Expand source code
def try_json(text: str) -> str: try: obj = json.loads(text) if isinstance(obj, dict): # pretty print error json stacktrace info return "\n".join(["%s: %s" % (k, v) for k, v in obj.items()]) return json.dumps(obj, indent=2) except Exception: return text
Classes
class AccountResource (client: RestClient, id: str, kyc_data: Optional[KycDataObject] = None)
-
AccountResource(client: diem.testing.miniwallet.client.RestClient, id: str, kyc_data: Optional[diem.offchain.types.payment_types.KycDataObject] = None)
Expand source code
@dataclass class AccountResource: client: RestClient id: str kyc_data: Optional[offchain.KycDataObject] = field(default=None) def balance(self, currency: str) -> int: """Get account balance for the given currency Calls `GET /accounts/{account_id}/balances` endpoint and only return balance of the given currency. Returns 0 if given currency does not exist in the returned balances. """ return self.balances().get(currency, 0) def send_payment(self, currency: str, amount: int, payee: str) -> Payment: """Send amount of currency to payee Calls `POST /accounts/{account_id}/payments` endpoint and returns payment details. """ p = self.client.create(self._resources("payment"), payee=payee, currency=currency, amount=amount) return Payment(id=p["id"], account_id=self.id, payee=payee, currency=currency, amount=amount) def generate_account_identifier(self) -> str: """Generate an account identifier Calls `POST /accounts/{account_id}/account_identifiers` to generate account identifier. """ ret = self.client.create(self._resources("account_identifier")) return ret["account_identifier"] def events(self, start: int = 0) -> List[Event]: """Get account events Calls to `GET /accounts/{account_id}/events` endpoint and returns events list. Raises `requests.HTTPError`, if the endpoint is not implemented. """ ret = self.client.send("GET", self._resources("event")).json() return [Event(**obj) for obj in ret[start:]] def find_event(self, event_type: str, start_index: int = 0, **kwargs: Any) -> Optional[Event]: """Find a specific event by `type`, `start_index` and `data` When matching the event `data`, it assumes `data` is JSON encoded dictionary, and returns the event if the `**kwargs` is subset of the dictionary decoded from event `data` field. """ events = [e for e in self.events(start_index) if e.type == event_type] for e in events: if _match(json.loads(e.data), **kwargs): return e def log_events(self) -> None: """Log account events as INFO Does nothing if get events API is not implemented. """ events = self.dump_events() if events: self.client.logger.info("account(%s) events: %s", self.id, events) def dump_events(self) -> str: """Dump account events as JSON encoded string (well formatted, and indent=2) Returns empty string if get events API is not implemented. """ try: return json.dumps(list(map(self.event_asdict, self.events())), indent=2) except requests.HTTPError: return "" def event_asdict(self, event: Event) -> Dict[str, Any]: """Returns `Event` as dictionary object. As we use JSON-encoded string field, this function tries to decoding all JSON-encoded string as dictionary for pretty print event data in log. """ ret = asdict(event) try: ret["data"] = json.loads(event.data) except json.decoder.JSONDecodeError: pass return ret def info(self, *args: Any, **kwargs: Any) -> None: """Log info to `client.logger`""" self.client.logger.info(*args, **kwargs) def balances(self) -> Dict[str, int]: """returns account balances object should always prefer to use func `balance(currency) -> int`, which returns zero when currency not exist in the response. """ return self.client.get(self._resources("balance")) def _resources(self, resource: str) -> str: return "/accounts/%s/%ss" % (self.id, resource)
Class variables
var client : RestClient
var id : str
var kyc_data : Optional[KycDataObject]
Methods
def balance(self, currency: str) ‑> int
-
Get account balance for the given currency
Calls
GET /accounts/{account_id}/balances
endpoint and only return balance of the given currency. Returns 0 if given currency does not exist in the returned balances.Expand source code
def balance(self, currency: str) -> int: """Get account balance for the given currency Calls `GET /accounts/{account_id}/balances` endpoint and only return balance of the given currency. Returns 0 if given currency does not exist in the returned balances. """ return self.balances().get(currency, 0)
def balances(self) ‑> Dict[str, int]
-
returns account balances object
should always prefer to use func
balance(currency) -> int
, which returns zero when currency not exist in the response.Expand source code
def balances(self) -> Dict[str, int]: """returns account balances object should always prefer to use func `balance(currency) -> int`, which returns zero when currency not exist in the response. """ return self.client.get(self._resources("balance"))
def dump_events(self) ‑> str
-
Dump account events as JSON encoded string (well formatted, and indent=2)
Returns empty string if get events API is not implemented.
Expand source code
def dump_events(self) -> str: """Dump account events as JSON encoded string (well formatted, and indent=2) Returns empty string if get events API is not implemented. """ try: return json.dumps(list(map(self.event_asdict, self.events())), indent=2) except requests.HTTPError: return ""
def event_asdict(self, event: Event) ‑> Dict[str, Any]
-
Returns
Event
as dictionary object.As we use JSON-encoded string field, this function tries to decoding all JSON-encoded string as dictionary for pretty print event data in log.
Expand source code
def event_asdict(self, event: Event) -> Dict[str, Any]: """Returns `Event` as dictionary object. As we use JSON-encoded string field, this function tries to decoding all JSON-encoded string as dictionary for pretty print event data in log. """ ret = asdict(event) try: ret["data"] = json.loads(event.data) except json.decoder.JSONDecodeError: pass return ret
def events(self, start: int = 0) ‑> List[Event]
-
Get account events
Calls to
GET /accounts/{account_id}/events
endpoint and returns events list.Raises
requests.HTTPError
, if the endpoint is not implemented.Expand source code
def events(self, start: int = 0) -> List[Event]: """Get account events Calls to `GET /accounts/{account_id}/events` endpoint and returns events list. Raises `requests.HTTPError`, if the endpoint is not implemented. """ ret = self.client.send("GET", self._resources("event")).json() return [Event(**obj) for obj in ret[start:]]
def find_event(self, event_type: str, start_index: int = 0, **kwargs: Any) ‑> Optional[Event]
-
Find a specific event by
type
,start_index
anddata
When matching the event
data
, it assumesdata
is JSON encoded dictionary, and returns the event if the**kwargs
is subset of the dictionary decoded from eventdata
field.Expand source code
def find_event(self, event_type: str, start_index: int = 0, **kwargs: Any) -> Optional[Event]: """Find a specific event by `type`, `start_index` and `data` When matching the event `data`, it assumes `data` is JSON encoded dictionary, and returns the event if the `**kwargs` is subset of the dictionary decoded from event `data` field. """ events = [e for e in self.events(start_index) if e.type == event_type] for e in events: if _match(json.loads(e.data), **kwargs): return e
def generate_account_identifier(self) ‑> str
-
Generate an account identifier
Calls
POST /accounts/{account_id}/account_identifiers
to generate account identifier.Expand source code
def generate_account_identifier(self) -> str: """Generate an account identifier Calls `POST /accounts/{account_id}/account_identifiers` to generate account identifier. """ ret = self.client.create(self._resources("account_identifier")) return ret["account_identifier"]
def info(self, *args: Any, **kwargs: Any) ‑> NoneType
-
Log info to
client.logger
Expand source code
def info(self, *args: Any, **kwargs: Any) -> None: """Log info to `client.logger`""" self.client.logger.info(*args, **kwargs)
def log_events(self) ‑> NoneType
-
Log account events as INFO
Does nothing if get events API is not implemented.
Expand source code
def log_events(self) -> None: """Log account events as INFO Does nothing if get events API is not implemented. """ events = self.dump_events() if events: self.client.logger.info("account(%s) events: %s", self.id, events)
def send_payment(self, currency: str, amount: int, payee: str) ‑> Payment
-
Send amount of currency to payee
Calls
POST /accounts/{account_id}/payments
endpoint and returns payment details.Expand source code
def send_payment(self, currency: str, amount: int, payee: str) -> Payment: """Send amount of currency to payee Calls `POST /accounts/{account_id}/payments` endpoint and returns payment details. """ p = self.client.create(self._resources("payment"), payee=payee, currency=currency, amount=amount) return Payment(id=p["id"], account_id=self.id, payee=payee, currency=currency, amount=amount)
class Payment (id: str, account_id: str, currency: str, amount: int, payee: str)
-
Payment(id: str, account_id: str, currency: str, amount: int, payee: str)
Expand source code
@dataclass class Payment: id: str account_id: str currency: str amount: int payee: str
Class variables
var account_id : str
var amount : int
var currency : str
var id : str
var payee : str
class RestClient (name: str, server_url: str, session: requests.sessions.Session = <factory>, events_api_is_optional: bool = False)
-
RestClient(name: str, server_url: str, session: requests.sessions.Session =
, events_api_is_optional: bool = False) Expand source code
@dataclass class RestClient: name: str server_url: str session: requests.Session = field(default_factory=requests.Session) logger: logging.Logger = field(init=False) events_api_is_optional: bool = field(default=False) def __post_init__(self) -> None: self.logger = logging.getLogger(self.name) def with_retry(self, retry: Retry = Retry(total=5, connect=5, backoff_factor=0.01)) -> "RestClient": self.session.mount(self.server_url, HTTPAdapter(max_retries=retry)) return self def create_account( self, balances: Optional[Dict[str, int]] = None, kyc_data: Optional[offchain.KycDataObject] = None, reject_additional_kyc_data_request: Optional[bool] = None, disable_background_tasks: Optional[bool] = None, ) -> "AccountResource": kwargs = { "balances": balances, "kyc_data": asdict(kyc_data) if kyc_data else None, "reject_additional_kyc_data_request": reject_additional_kyc_data_request, "disable_background_tasks": disable_background_tasks, } account = self.create("/accounts", **{k: v for k, v in kwargs.items() if v}) return AccountResource(client=self, id=account["id"], kyc_data=kyc_data) def get_kyc_sample(self) -> KycSample: return offchain.from_dict(self.send("GET", "/kyc_sample").json(), KycSample) def create(self, path: str, **kwargs: Any) -> Dict[str, Any]: return self.send("POST", path, json.dumps(kwargs) if kwargs else None).json() def get(self, path: str) -> Dict[str, Any]: return self.send("GET", path).json() def send(self, method: str, path: str, data: Optional[str] = None) -> requests.Response: url = "%s/%s" % (self.server_url.rstrip("/"), path.lstrip("/")) self.logger.debug("%s %s: %s", method, path, data) headers = { "Content-Type": "application/json", "User-Agent": jsonrpc.client.USER_AGENT_HTTP_HEADER, "X-Test-Case": os.getenv("PYTEST_CURRENT_TEST"), } resp = self.session.request( method=method, url=url.lower(), data=data, headers={k: v for k, v in headers.items() if v}, ) log_level = logging.DEBUG if resp.status_code < 300 else logging.ERROR if self.events_api_is_optional and path.endswith("/events"): log_level = logging.DEBUG self.logger.log(log_level, "%s %s: %s - %s", method, path, data, resp.status_code) self.logger.log(log_level, "response body: \n%s", try_json(resp.text)) resp.raise_for_status() return resp
Class variables
var events_api_is_optional : bool
var logger : logging.Logger
var name : str
var server_url : str
var session : requests.sessions.Session
Methods
def create(self, path: str, **kwargs: Any) ‑> Dict[str, Any]
-
Expand source code
def create(self, path: str, **kwargs: Any) -> Dict[str, Any]: return self.send("POST", path, json.dumps(kwargs) if kwargs else None).json()
def create_account(self, balances: Optional[Dict[str, int]] = None, kyc_data: Optional[KycDataObject] = None, reject_additional_kyc_data_request: Optional[bool] = None, disable_background_tasks: Optional[bool] = None) ‑> AccountResource
-
Expand source code
def create_account( self, balances: Optional[Dict[str, int]] = None, kyc_data: Optional[offchain.KycDataObject] = None, reject_additional_kyc_data_request: Optional[bool] = None, disable_background_tasks: Optional[bool] = None, ) -> "AccountResource": kwargs = { "balances": balances, "kyc_data": asdict(kyc_data) if kyc_data else None, "reject_additional_kyc_data_request": reject_additional_kyc_data_request, "disable_background_tasks": disable_background_tasks, } account = self.create("/accounts", **{k: v for k, v in kwargs.items() if v}) return AccountResource(client=self, id=account["id"], kyc_data=kyc_data)
def get(self, path: str) ‑> Dict[str, Any]
-
Expand source code
def get(self, path: str) -> Dict[str, Any]: return self.send("GET", path).json()
def get_kyc_sample(self) ‑> KycSample
-
Expand source code
def get_kyc_sample(self) -> KycSample: return offchain.from_dict(self.send("GET", "/kyc_sample").json(), KycSample)
def send(self, method: str, path: str, data: Optional[str] = None) ‑> requests.models.Response
-
Expand source code
def send(self, method: str, path: str, data: Optional[str] = None) -> requests.Response: url = "%s/%s" % (self.server_url.rstrip("/"), path.lstrip("/")) self.logger.debug("%s %s: %s", method, path, data) headers = { "Content-Type": "application/json", "User-Agent": jsonrpc.client.USER_AGENT_HTTP_HEADER, "X-Test-Case": os.getenv("PYTEST_CURRENT_TEST"), } resp = self.session.request( method=method, url=url.lower(), data=data, headers={k: v for k, v in headers.items() if v}, ) log_level = logging.DEBUG if resp.status_code < 300 else logging.ERROR if self.events_api_is_optional and path.endswith("/events"): log_level = logging.DEBUG self.logger.log(log_level, "%s %s: %s - %s", method, path, data, resp.status_code) self.logger.log(log_level, "response body: \n%s", try_json(resp.text)) resp.raise_for_status() return resp
def with_retry(self, retry: urllib3.util.retry.Retry = Retry(total=5, connect=5, read=None, redirect=None, status=None)) ‑> RestClient
-
Expand source code
def with_retry(self, retry: Retry = Retry(total=5, connect=5, backoff_factor=0.01)) -> "RestClient": self.session.mount(self.server_url, HTTPAdapter(max_retries=retry)) return self