Module diem.testing.miniwallet.app.event_puller
Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass, field
from typing import Dict, Callable, Any
from .store import InMemoryStore, NotFoundError
from .pending_account import PENDING_INBOUND_ACCOUNT_ID
from .models import Transaction, Subaddress, PaymentCommand, RefundReason, Account
from .... import jsonrpc, diem_types, txnmetadata, identifier, utils
import copy, logging
@dataclass
class EventPuller:
client: jsonrpc.Client
store: InMemoryStore
hrp: str
logger: logging.Logger
state: Dict[str, int] = field(default_factory=dict)
def add(self, address: diem_types.AccountAddress) -> None:
account = self.client.must_get_account(address)
self.state[account.received_events_key] = 0
def fetch(self, process: Callable[[jsonrpc.Event], None], batch_size: int = 100) -> None:
for key, seq in self.state.items():
events = self.client.get_events(key, seq, batch_size)
if events:
for event in events:
process(event)
self.state[key] = event.sequence_number + 1
def head(self) -> None:
state = None
while state != self.state:
state = copy.copy(self.state)
self.fetch(lambda _: None)
def save_payment_txn(self, event: jsonrpc.Event) -> None:
self.logger.info("processing Event:\n%s", event)
try:
self._save_payment_txn(event)
except (NotFoundError, ValueError):
self.logger.exception("process event failed")
self._create_txn(PENDING_INBOUND_ACCOUNT_ID, event)
def _save_payment_txn(self, event: jsonrpc.Event) -> None:
metadata = txnmetadata.decode_structure(event.data.metadata)
if isinstance(metadata, diem_types.GeneralMetadataV0):
subaddress = utils.hex(metadata.to_subaddress)
try:
res = self.store.find(Subaddress, subaddress_hex=subaddress)
except NotFoundError:
self.logger.exception("invalid general metadata to_subaddress")
return self._refund(RefundReason.invalid_subaddress, event)
return self._create_txn(res.account_id, event, subaddress_hex=res.subaddress_hex)
elif isinstance(metadata, diem_types.TravelRuleMetadataV0):
try:
cmd = self.store.find(PaymentCommand, reference_id=metadata.off_chain_reference_id)
except NotFoundError:
self.logger.exception("invalid travel rule metadata off-chain reference id")
return self._refund(RefundReason.other, event)
return self._create_txn(cmd.account_id, event, reference_id=cmd.reference_id)
elif isinstance(metadata, diem_types.RefundMetadataV0):
version = int(metadata.transaction_version)
reason = RefundReason.from_diem_type(metadata.reason)
account_id = self._find_refund_account_id(version)
return self._create_txn(account_id, event, refund_diem_txn_version=version, refund_reason=reason)
raise ValueError("unrecognized metadata: %r" % event.data.metadata)
def _find_refund_account_id(self, version: int) -> str:
diem_txns = self.client.get_transactions(version, 1)
if diem_txns:
diem_txn = diem_txns[0]
self.logger.error("diem_txn script: %r", diem_txn.transaction.script)
original_metadata = txnmetadata.decode_structure(diem_txn.transaction.script.metadata)
if isinstance(original_metadata, diem_types.GeneralMetadataV0):
original_sender = utils.hex(original_metadata.from_subaddress)
try:
return self.store.find(Subaddress, subaddress_hex=original_sender).account_id
except NotFoundError:
self.logger.exception("invalid original transaction metadata from_subaddress")
else:
self.logger.error("invalid original txn metadata %r", diem_txn.transaction.script.metadata)
else:
self.logger.error("could not find diem txn by version: %s", version)
return PENDING_INBOUND_ACCOUNT_ID
def _refund(self, reason: RefundReason, event: jsonrpc.Event) -> None:
self._create_txn(PENDING_INBOUND_ACCOUNT_ID, event)
payee = identifier.encode_account(event.data.sender, None, self.hrp)
self.store.create(
Transaction,
account_id=PENDING_INBOUND_ACCOUNT_ID,
status=Transaction.Status.pending,
type=Transaction.Type.sent_payment,
currency=event.data.amount.currency,
amount=event.data.amount.amount,
payee=payee,
refund_diem_txn_version=event.transaction_version,
refund_reason=reason,
)
def _create_txn(self, account_id: str, event: jsonrpc.Event, **kwargs: Any) -> None:
if self.store.find(Account, id=account_id).disable_background_tasks:
self.logger.debug("account(%s) bg tasks disabled, ignore %s", account_id, event)
return
self.logger.info("account(id=%s) receives payment amount %s", account_id, event.data.amount.amount)
self.store.create(
Transaction,
account_id=account_id,
currency=event.data.amount.currency,
amount=event.data.amount.amount,
diem_transaction_version=event.transaction_version,
status=Transaction.Status.completed,
type=Transaction.Type.received_payment,
**kwargs,
)
Classes
class EventPuller (client: Client, store: InMemoryStore, hrp: str, logger: logging.Logger, state: Dict[str, int] = <factory>)
-
EventPuller(client: diem.jsonrpc.client.Client, store: diem.testing.miniwallet.app.store.InMemoryStore, hrp: str, logger: logging.Logger, state: Dict[str, int] =
) Expand source code
@dataclass class EventPuller: client: jsonrpc.Client store: InMemoryStore hrp: str logger: logging.Logger state: Dict[str, int] = field(default_factory=dict) def add(self, address: diem_types.AccountAddress) -> None: account = self.client.must_get_account(address) self.state[account.received_events_key] = 0 def fetch(self, process: Callable[[jsonrpc.Event], None], batch_size: int = 100) -> None: for key, seq in self.state.items(): events = self.client.get_events(key, seq, batch_size) if events: for event in events: process(event) self.state[key] = event.sequence_number + 1 def head(self) -> None: state = None while state != self.state: state = copy.copy(self.state) self.fetch(lambda _: None) def save_payment_txn(self, event: jsonrpc.Event) -> None: self.logger.info("processing Event:\n%s", event) try: self._save_payment_txn(event) except (NotFoundError, ValueError): self.logger.exception("process event failed") self._create_txn(PENDING_INBOUND_ACCOUNT_ID, event) def _save_payment_txn(self, event: jsonrpc.Event) -> None: metadata = txnmetadata.decode_structure(event.data.metadata) if isinstance(metadata, diem_types.GeneralMetadataV0): subaddress = utils.hex(metadata.to_subaddress) try: res = self.store.find(Subaddress, subaddress_hex=subaddress) except NotFoundError: self.logger.exception("invalid general metadata to_subaddress") return self._refund(RefundReason.invalid_subaddress, event) return self._create_txn(res.account_id, event, subaddress_hex=res.subaddress_hex) elif isinstance(metadata, diem_types.TravelRuleMetadataV0): try: cmd = self.store.find(PaymentCommand, reference_id=metadata.off_chain_reference_id) except NotFoundError: self.logger.exception("invalid travel rule metadata off-chain reference id") return self._refund(RefundReason.other, event) return self._create_txn(cmd.account_id, event, reference_id=cmd.reference_id) elif isinstance(metadata, diem_types.RefundMetadataV0): version = int(metadata.transaction_version) reason = RefundReason.from_diem_type(metadata.reason) account_id = self._find_refund_account_id(version) return self._create_txn(account_id, event, refund_diem_txn_version=version, refund_reason=reason) raise ValueError("unrecognized metadata: %r" % event.data.metadata) def _find_refund_account_id(self, version: int) -> str: diem_txns = self.client.get_transactions(version, 1) if diem_txns: diem_txn = diem_txns[0] self.logger.error("diem_txn script: %r", diem_txn.transaction.script) original_metadata = txnmetadata.decode_structure(diem_txn.transaction.script.metadata) if isinstance(original_metadata, diem_types.GeneralMetadataV0): original_sender = utils.hex(original_metadata.from_subaddress) try: return self.store.find(Subaddress, subaddress_hex=original_sender).account_id except NotFoundError: self.logger.exception("invalid original transaction metadata from_subaddress") else: self.logger.error("invalid original txn metadata %r", diem_txn.transaction.script.metadata) else: self.logger.error("could not find diem txn by version: %s", version) return PENDING_INBOUND_ACCOUNT_ID def _refund(self, reason: RefundReason, event: jsonrpc.Event) -> None: self._create_txn(PENDING_INBOUND_ACCOUNT_ID, event) payee = identifier.encode_account(event.data.sender, None, self.hrp) self.store.create( Transaction, account_id=PENDING_INBOUND_ACCOUNT_ID, status=Transaction.Status.pending, type=Transaction.Type.sent_payment, currency=event.data.amount.currency, amount=event.data.amount.amount, payee=payee, refund_diem_txn_version=event.transaction_version, refund_reason=reason, ) def _create_txn(self, account_id: str, event: jsonrpc.Event, **kwargs: Any) -> None: if self.store.find(Account, id=account_id).disable_background_tasks: self.logger.debug("account(%s) bg tasks disabled, ignore %s", account_id, event) return self.logger.info("account(id=%s) receives payment amount %s", account_id, event.data.amount.amount) self.store.create( Transaction, account_id=account_id, currency=event.data.amount.currency, amount=event.data.amount.amount, diem_transaction_version=event.transaction_version, status=Transaction.Status.completed, type=Transaction.Type.received_payment, **kwargs, )
Class variables
var client : Client
var hrp : str
var logger : logging.Logger
var state : Dict[str, int]
var store : InMemoryStore
Methods
def add(self, address: AccountAddress) ‑> NoneType
-
Expand source code
def add(self, address: diem_types.AccountAddress) -> None: account = self.client.must_get_account(address) self.state[account.received_events_key] = 0
def fetch(self, process: Callable[[jsonrpc_pb2.Event], NoneType], batch_size: int = 100) ‑> NoneType
-
Expand source code
def fetch(self, process: Callable[[jsonrpc.Event], None], batch_size: int = 100) -> None: for key, seq in self.state.items(): events = self.client.get_events(key, seq, batch_size) if events: for event in events: process(event) self.state[key] = event.sequence_number + 1
def head(self) ‑> NoneType
-
Expand source code
def head(self) -> None: state = None while state != self.state: state = copy.copy(self.state) self.fetch(lambda _: None)
def save_payment_txn(self, event: jsonrpc_pb2.Event) ‑> NoneType
-
Expand source code
def save_payment_txn(self, event: jsonrpc.Event) -> None: self.logger.info("processing Event:\n%s", event) try: self._save_payment_txn(event) except (NotFoundError, ValueError): self.logger.exception("process event failed") self._create_txn(PENDING_INBOUND_ACCOUNT_ID, event)