Module examples.p2p_transfer

Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0

import time

from diem import (
    identifier,
    diem_types,
    stdlib,
    testnet,
    txnmetadata,
    utils,
)

from .stubs import CustodialApp


def test_non_custodial_to_non_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender = faucet.gen_account()
    receiver = faucet.gen_account()

    amount = 1_000_000

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(testnet.TEST_CURRENCY_CODE),
        payee=receiver.account_address,
        amount=amount,
        metadata=b"",  # no requirement for metadata and metadata signature
        metadata_signature=b"",
    )
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, testnet.TEST_CURRENCY_CODE)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None


def test_non_custodial_to_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender = faucet.gen_account()
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)

    intent_id = receiver_custodial.payment(user_id=0, amount=1_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=txnmetadata.general_metadata(None, intent.sub_address),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None


def test_custodial_to_non_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver = faucet.gen_account()

    amount = 1_000_000
    currency_code = testnet.TEST_CURRENCY_CODE

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(currency_code),
        payee=utils.account_address(receiver.account_address),
        amount=amount,
        metadata=txnmetadata.general_metadata(sender_custodial.find_user_sub_address_by_id(0), None),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )

    sender = sender_custodial.available_child_vasp()
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None


def test_custodial_to_custodial_under_threshold():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)

    intent_id = receiver_custodial.payment(user_id=0, amount=1_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=txnmetadata.general_metadata(sender_custodial.find_user_sub_address_by_id(0), intent.sub_address),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )

    sender = sender_custodial.available_child_vasp()
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None


def test_custodial_to_custodial_above_threshold():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial.init_compliance_keys()

    intent_id = receiver_custodial.payment(user_id=0, amount=2_000_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    sender = sender_custodial.available_child_vasp()
    # sender & receiver communicate by off chain APIs
    off_chain_reference_id = "32323abc"
    metadata, metadata_signing_msg = txnmetadata.travel_rule(
        off_chain_reference_id, sender.account_address, intent.amount
    )
    metadata_signature = receiver_custodial.compliance_key.sign(metadata_signing_msg)

    # sender constructs transaction after off chain communication
    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=metadata,
        metadata_signature=metadata_signature,
    )

    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None


def create_transaction(sender, sender_account_sequence, script, currency):
    return diem_types.RawTransaction(
        sender=sender.account_address,
        sequence_number=sender_account_sequence,
        payload=diem_types.TransactionPayload__Script(script),
        max_gas_amount=1_000_000,
        gas_unit_price=0,
        gas_currency_code=currency,
        expiration_timestamp_secs=int(time.time()) + 30,
        chain_id=testnet.CHAIN_ID,
    )

Functions

def create_transaction(sender, sender_account_sequence, script, currency)
Expand source code
def create_transaction(sender, sender_account_sequence, script, currency):
    return diem_types.RawTransaction(
        sender=sender.account_address,
        sequence_number=sender_account_sequence,
        payload=diem_types.TransactionPayload__Script(script),
        max_gas_amount=1_000_000,
        gas_unit_price=0,
        gas_currency_code=currency,
        expiration_timestamp_secs=int(time.time()) + 30,
        chain_id=testnet.CHAIN_ID,
    )
def test_custodial_to_custodial_above_threshold()
Expand source code
def test_custodial_to_custodial_above_threshold():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial.init_compliance_keys()

    intent_id = receiver_custodial.payment(user_id=0, amount=2_000_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    sender = sender_custodial.available_child_vasp()
    # sender & receiver communicate by off chain APIs
    off_chain_reference_id = "32323abc"
    metadata, metadata_signing_msg = txnmetadata.travel_rule(
        off_chain_reference_id, sender.account_address, intent.amount
    )
    metadata_signature = receiver_custodial.compliance_key.sign(metadata_signing_msg)

    # sender constructs transaction after off chain communication
    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=metadata,
        metadata_signature=metadata_signature,
    )

    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None
def test_custodial_to_custodial_under_threshold()
Expand source code
def test_custodial_to_custodial_under_threshold():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)

    intent_id = receiver_custodial.payment(user_id=0, amount=1_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=txnmetadata.general_metadata(sender_custodial.find_user_sub_address_by_id(0), intent.sub_address),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )

    sender = sender_custodial.available_child_vasp()
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None
def test_custodial_to_non_custodial()
Expand source code
def test_custodial_to_non_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender_custodial = CustodialApp.create(faucet.gen_account(), client)
    receiver = faucet.gen_account()

    amount = 1_000_000
    currency_code = testnet.TEST_CURRENCY_CODE

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(currency_code),
        payee=utils.account_address(receiver.account_address),
        amount=amount,
        metadata=txnmetadata.general_metadata(sender_custodial.find_user_sub_address_by_id(0), None),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )

    sender = sender_custodial.available_child_vasp()
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None
def test_non_custodial_to_custodial()
Expand source code
def test_non_custodial_to_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender = faucet.gen_account()
    receiver_custodial = CustodialApp.create(faucet.gen_account(), client)

    intent_id = receiver_custodial.payment(user_id=0, amount=1_000_000)

    intent = identifier.decode_intent(intent_id, identifier.TDM)

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(intent.currency_code),
        payee=utils.account_address(intent.account_address),
        amount=intent.amount,
        metadata=txnmetadata.general_metadata(None, intent.sub_address),
        metadata_signature=b"",  # only travel rule metadata requires signature
    )
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, intent.currency_code)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None
def test_non_custodial_to_non_custodial()
Expand source code
def test_non_custodial_to_non_custodial():
    client = testnet.create_client()
    faucet = testnet.Faucet(client)

    sender = faucet.gen_account()
    receiver = faucet.gen_account()

    amount = 1_000_000

    script = stdlib.encode_peer_to_peer_with_metadata_script(
        currency=utils.currency_code(testnet.TEST_CURRENCY_CODE),
        payee=receiver.account_address,
        amount=amount,
        metadata=b"",  # no requirement for metadata and metadata signature
        metadata_signature=b"",
    )
    seq_num = client.get_account_sequence(sender.account_address)
    txn = create_transaction(sender, seq_num, script, testnet.TEST_CURRENCY_CODE)

    signed_txn = sender.sign(txn)
    client.submit(signed_txn)
    executed_txn = client.wait_for_transaction(signed_txn)
    assert executed_txn is not None