Module diem.identifier.bech32
Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
######################################################################################
# Bech32 implementation for Diem human readable addresses based on
# Bitcoin's segwit python lib https://github.com/fiatjaf/bech32 modified to support the
# requirements of Diem (sub)address and versioning specs.
import typing
from .subaddress import DIEM_SUBADDRESS_SIZE, DIEM_ZERO_SUBADDRESS
# Bech32 constants
_BECH32_CHARSET = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"
_BECH32_SEPARATOR = "1"
_BECH32_CHECKSUM_CHAR_SIZE = 6
# DIEM constants
_DIEM_ADDRESS_SIZE = 16 # in bytes
_DIEM_BECH32_VERSION = 1
_DIEM_BECH32_SIZE = [50, 49] # in characters
class Bech32Error(Exception):
""" Represents an error when creating a Diem address. """
pass
def bech32_address_encode(hrp: str, address_bytes: bytes, subaddress_bytes: typing.Optional[bytes]) -> str:
"""Encode a Diem address (and sub-address if provided).
Args:
hrp: Bech32 human readable part
address_bytes: on-chain account address (16 bytes)
subaddress_bytes: subaddress (8 bytes). If not provided, it is set to 8 zero bytes
Returns:
Bech32 encoded address
"""
# only accept correct size for Diem address
if len(address_bytes) != _DIEM_ADDRESS_SIZE:
raise Bech32Error(f"Address size should be {_DIEM_ADDRESS_SIZE}, but got: {len(address_bytes)}")
# only accept correct size for Diem subaddress (if set)
if subaddress_bytes is not None and len(subaddress_bytes) != DIEM_SUBADDRESS_SIZE:
raise Bech32Error(f"Subaddress size should be {DIEM_SUBADDRESS_SIZE}, but got: {len(subaddress_bytes)}")
encoding_version = _DIEM_BECH32_VERSION
# if subaddress has not been provided it's set to 8 zero bytes.
subaddress_final_bytes = subaddress_bytes if subaddress_bytes is not None else DIEM_ZERO_SUBADDRESS
total_bytes = address_bytes + subaddress_final_bytes
five_bit_data = _convertbits(total_bytes, 8, 5, True)
# check base conversion
if five_bit_data is None:
raise Bech32Error("Error converting bytes to base32")
return _bech32_encode(hrp, [encoding_version] + five_bit_data)
def bech32_address_decode(expected_hrp: str, bech32: str) -> typing.Tuple[int, bytes, bytes]:
"""Validate a Bech32 Diem address Bech32 string, and split between version, address and sub-address.
Args:
expected_hrp: expected Bech32 human readable part (lbr or tlb)
bech32: Bech32 encoded address
Returns:
A tuple consisiting of the Bech32 version (int), address (16 bytes), subaddress (8 bytes)
"""
len_bech32 = len(bech32)
len_hrp = len(expected_hrp)
# check expected length
if len_bech32 not in _DIEM_BECH32_SIZE:
raise Bech32Error(f"Bech32 size should be {_DIEM_BECH32_SIZE}, but it is: {len_bech32}")
# do not allow mixed case per BIP 173
if bech32 != bech32.lower() and bech32 != bech32.upper():
raise Bech32Error(f"Mixed case Bech32 addresses are not allowed, got: {bech32}")
bech32 = bech32.lower()
hrp = bech32[:len_hrp]
if hrp != expected_hrp:
raise Bech32Error(
f"Wrong Diem address Bech32 human readable part (prefix): expect {expected_hrp} but got {hrp}"
)
# check separator
if bech32[len_hrp] != _BECH32_SEPARATOR:
raise Bech32Error(f"Non-expected Bech32 separator: {bech32[len_hrp]}")
# check characters after separator in Bech32 alphabet
if not all(x in _BECH32_CHARSET for x in bech32[len_hrp + 1 :]):
raise Bech32Error(f"Invalid Bech32 characters detected: {bech32}")
# version is defined by the index of the Bech32 character after separator
address_version = _BECH32_CHARSET.find(bech32[len_hrp + 1])
# check valid version
if address_version != _DIEM_BECH32_VERSION:
raise Bech32Error(f"Version mismatch. Expected {_DIEM_BECH32_VERSION}, " f"but received {address_version}")
# we've already checked that all characters are in the correct alphabet,
# thus, this will always succeed
data = [_BECH32_CHARSET.find(x) for x in bech32[len_hrp + 2 :]]
# check Bech32 checksum
if not _bech32_verify_checksum(hrp, [address_version] + data):
raise Bech32Error(f"Bech32 checksum validation failed: {bech32}")
decoded_data = _convertbits(data[:-_BECH32_CHECKSUM_CHAR_SIZE], 5, 8, False)
# check base conversion
if decoded_data is None:
raise Bech32Error("Error converting bytes from base32")
length_data = len(decoded_data)
# extra check about the expected output (sub)address size in bytes
if length_data != _DIEM_ADDRESS_SIZE + DIEM_SUBADDRESS_SIZE:
raise Bech32Error(
f"Expected {_DIEM_ADDRESS_SIZE + DIEM_SUBADDRESS_SIZE} bytes after decoding, but got: {length_data}"
)
return (
address_version,
bytes(decoded_data[:_DIEM_ADDRESS_SIZE]),
bytes(decoded_data[-DIEM_SUBADDRESS_SIZE:]),
)
def _bech32_polymod(values: typing.Iterable[int]) -> int:
"""Internal function that computes the Bech32 checksum."""
generator = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3]
chk = 1
for value in values:
top = chk >> 25
chk = (chk & 0x1FFFFFF) << 5 ^ value
for i in range(5):
chk ^= generator[i] if ((top >> i) & 1) else 0
return chk
def _bech32_hrp_expand(hrp: str) -> typing.List[int]:
"""Expand the HRP into values for checksum computation."""
return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]
def _bech32_verify_checksum(hrp: str, data: typing.Iterable[int]) -> bool:
"""Verify a checksum given HRP and converted data characters."""
return _bech32_polymod(_bech32_hrp_expand(hrp) + list(data)) == 1
def _bech32_create_checksum(hrp: str, data: typing.Iterable[int]) -> typing.List[int]:
"""Compute the checksum values given HRP and data."""
values = _bech32_hrp_expand(hrp) + list(data)
polymod = _bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]
def _bech32_encode(hrp: str, data: typing.Iterable[int]) -> str:
"""Compute a Bech32 string given HRP and data values."""
combined = list(data) + _bech32_create_checksum(hrp, data)
return hrp + _BECH32_SEPARATOR + "".join([_BECH32_CHARSET[d] for d in combined])
def _convertbits(
data: typing.Iterable[int], from_bits: int, to_bits: int, pad: bool
) -> typing.Optional[typing.List[int]]:
"""General power-of-2 base conversion."""
acc = 0
bits = 0
ret = []
maxv = (1 << to_bits) - 1
max_acc = (1 << (from_bits + to_bits - 1)) - 1
for value in data:
if value < 0 or (value >> from_bits):
return None
acc = ((acc << from_bits) | value) & max_acc
bits += from_bits
while bits >= to_bits:
bits -= to_bits
ret.append((acc >> bits) & maxv)
if pad:
if bits:
ret.append((acc << (to_bits - bits)) & maxv)
elif bits >= from_bits or ((acc << (to_bits - bits)) & maxv):
return None
return ret
Functions
def bech32_address_decode(expected_hrp: str, bech32: str) ‑> Tuple[int, bytes, bytes]
-
Validate a Bech32 Diem address Bech32 string, and split between version, address and sub-address.
Args
expected_hrp
- expected Bech32 human readable part (lbr or tlb)
bech32
- Bech32 encoded address
Returns
A tuple consisiting of the Bech32 version (int), address (16 bytes), subaddress (8 bytes)
Expand source code
def bech32_address_decode(expected_hrp: str, bech32: str) -> typing.Tuple[int, bytes, bytes]: """Validate a Bech32 Diem address Bech32 string, and split between version, address and sub-address. Args: expected_hrp: expected Bech32 human readable part (lbr or tlb) bech32: Bech32 encoded address Returns: A tuple consisiting of the Bech32 version (int), address (16 bytes), subaddress (8 bytes) """ len_bech32 = len(bech32) len_hrp = len(expected_hrp) # check expected length if len_bech32 not in _DIEM_BECH32_SIZE: raise Bech32Error(f"Bech32 size should be {_DIEM_BECH32_SIZE}, but it is: {len_bech32}") # do not allow mixed case per BIP 173 if bech32 != bech32.lower() and bech32 != bech32.upper(): raise Bech32Error(f"Mixed case Bech32 addresses are not allowed, got: {bech32}") bech32 = bech32.lower() hrp = bech32[:len_hrp] if hrp != expected_hrp: raise Bech32Error( f"Wrong Diem address Bech32 human readable part (prefix): expect {expected_hrp} but got {hrp}" ) # check separator if bech32[len_hrp] != _BECH32_SEPARATOR: raise Bech32Error(f"Non-expected Bech32 separator: {bech32[len_hrp]}") # check characters after separator in Bech32 alphabet if not all(x in _BECH32_CHARSET for x in bech32[len_hrp + 1 :]): raise Bech32Error(f"Invalid Bech32 characters detected: {bech32}") # version is defined by the index of the Bech32 character after separator address_version = _BECH32_CHARSET.find(bech32[len_hrp + 1]) # check valid version if address_version != _DIEM_BECH32_VERSION: raise Bech32Error(f"Version mismatch. Expected {_DIEM_BECH32_VERSION}, " f"but received {address_version}") # we've already checked that all characters are in the correct alphabet, # thus, this will always succeed data = [_BECH32_CHARSET.find(x) for x in bech32[len_hrp + 2 :]] # check Bech32 checksum if not _bech32_verify_checksum(hrp, [address_version] + data): raise Bech32Error(f"Bech32 checksum validation failed: {bech32}") decoded_data = _convertbits(data[:-_BECH32_CHECKSUM_CHAR_SIZE], 5, 8, False) # check base conversion if decoded_data is None: raise Bech32Error("Error converting bytes from base32") length_data = len(decoded_data) # extra check about the expected output (sub)address size in bytes if length_data != _DIEM_ADDRESS_SIZE + DIEM_SUBADDRESS_SIZE: raise Bech32Error( f"Expected {_DIEM_ADDRESS_SIZE + DIEM_SUBADDRESS_SIZE} bytes after decoding, but got: {length_data}" ) return ( address_version, bytes(decoded_data[:_DIEM_ADDRESS_SIZE]), bytes(decoded_data[-DIEM_SUBADDRESS_SIZE:]), )
def bech32_address_encode(hrp: str, address_bytes: bytes, subaddress_bytes: Optional[bytes]) ‑> str
-
Encode a Diem address (and sub-address if provided).
Args
hrp
- Bech32 human readable part
address_bytes
- on-chain account address (16 bytes)
subaddress_bytes
- subaddress (8 bytes). If not provided, it is set to 8 zero bytes
Returns
Bech32 encoded address
Expand source code
def bech32_address_encode(hrp: str, address_bytes: bytes, subaddress_bytes: typing.Optional[bytes]) -> str: """Encode a Diem address (and sub-address if provided). Args: hrp: Bech32 human readable part address_bytes: on-chain account address (16 bytes) subaddress_bytes: subaddress (8 bytes). If not provided, it is set to 8 zero bytes Returns: Bech32 encoded address """ # only accept correct size for Diem address if len(address_bytes) != _DIEM_ADDRESS_SIZE: raise Bech32Error(f"Address size should be {_DIEM_ADDRESS_SIZE}, but got: {len(address_bytes)}") # only accept correct size for Diem subaddress (if set) if subaddress_bytes is not None and len(subaddress_bytes) != DIEM_SUBADDRESS_SIZE: raise Bech32Error(f"Subaddress size should be {DIEM_SUBADDRESS_SIZE}, but got: {len(subaddress_bytes)}") encoding_version = _DIEM_BECH32_VERSION # if subaddress has not been provided it's set to 8 zero bytes. subaddress_final_bytes = subaddress_bytes if subaddress_bytes is not None else DIEM_ZERO_SUBADDRESS total_bytes = address_bytes + subaddress_final_bytes five_bit_data = _convertbits(total_bytes, 8, 5, True) # check base conversion if five_bit_data is None: raise Bech32Error("Error converting bytes to base32") return _bech32_encode(hrp, [encoding_version] + five_bit_data)
Classes
class Bech32Error (*args, **kwargs)
-
Represents an error when creating a Diem address.
Expand source code
class Bech32Error(Exception): """ Represents an error when creating a Diem address. """ pass
Ancestors
- builtins.Exception
- builtins.BaseException