Module diem.bcs
Expand source code
# Copyright (c) Facebook, Inc. and its affiliates
# SPDX-License-Identifier: MIT OR Apache-2.0
import dataclasses
import collections
import io
import typing
from copy import copy
from typing import get_type_hints
from diem import serde_types as st
from diem import serde_binary as sb
MAX_LENGTH = (1 << 31) - 1
MAX_U32 = (1 << 32) - 1
MAX_CONTAINER_DEPTH = 500
class BcsSerializer(sb.BinarySerializer):
def __init__(self):
super().__init__(output=io.BytesIO(), container_depth_budget=MAX_CONTAINER_DEPTH)
def serialize_u32_as_uleb128(self, value: int):
while value >= 0x80:
b = (value & 0x7F) | 0x80
self.output.write(b.to_bytes(1, "little", signed=False))
value >>= 7
self.output.write(value.to_bytes(1, "little", signed=False))
def serialize_len(self, value: int):
if value > MAX_LENGTH:
raise st.SerializationError("Length exceeds the maximum supported value.")
self.serialize_u32_as_uleb128(value)
def serialize_variant_index(self, value: int):
if value > MAX_U32:
raise st.SerializationError("Variant index exceeds the maximum supported value.")
self.serialize_u32_as_uleb128(value)
def sort_map_entries(self, offsets: typing.List[int]):
if len(offsets) < 1:
return
buf = self.output.getbuffer()
offsets.append(len(buf))
slices = []
for i in range(1, len(offsets)):
slices.append(bytes(buf[offsets[i - 1] : offsets[i]]))
buf.release()
slices.sort()
self.output.seek(offsets[0])
for s in slices:
self.output.write(s)
assert offsets[-1] == len(self.output.getbuffer())
class BcsDeserializer(sb.BinaryDeserializer):
def __init__(self, content):
super().__init__(input=io.BytesIO(content), container_depth_budget=MAX_CONTAINER_DEPTH)
def deserialize_uleb128_as_u32(self) -> int:
value = 0
for shift in range(0, 32, 7):
byte = int.from_bytes(self.read(1), "little", signed=False)
digit = byte & 0x7F
value |= digit << shift
if value > MAX_U32:
raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value")
if digit == byte:
if shift > 0 and digit == 0:
raise st.DeserializationError("Invalid uleb128 number (unexpected zero digit)")
return value
raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value")
def deserialize_len(self) -> int:
value = self.deserialize_uleb128_as_u32()
if value > MAX_LENGTH:
raise st.DeserializationError("Length exceeds the maximum supported value.")
return value
def deserialize_variant_index(self) -> int:
return self.deserialize_uleb128_as_u32()
def check_that_key_slices_are_increasing(self, slice1: typing.Tuple[int, int], slice2: typing.Tuple[int, int]):
key1 = bytes(self.input.getbuffer()[slice1[0] : slice1[1]])
key2 = bytes(self.input.getbuffer()[slice2[0] : slice2[1]])
if key1 >= key2:
raise st.DeserializationError("Serialized keys in a map must be ordered by increasing lexicographic order")
def serialize(obj: typing.Any, obj_type) -> bytes:
serializer = BcsSerializer()
serializer.serialize_any(obj, obj_type)
return serializer.get_buffer()
def deserialize(content: bytes, obj_type) -> typing.Tuple[typing.Any, bytes]:
deserializer = BcsDeserializer(content)
value = deserializer.deserialize_any(obj_type)
return value, deserializer.get_remaining_buffer()
Functions
def deserialize(content: bytes, obj_type) ‑> Tuple[Any, bytes]
-
Expand source code
def deserialize(content: bytes, obj_type) -> typing.Tuple[typing.Any, bytes]: deserializer = BcsDeserializer(content) value = deserializer.deserialize_any(obj_type) return value, deserializer.get_remaining_buffer()
def serialize(obj: Any, obj_type) ‑> bytes
-
Expand source code
def serialize(obj: typing.Any, obj_type) -> bytes: serializer = BcsSerializer() serializer.serialize_any(obj, obj_type) return serializer.get_buffer()
Classes
class BcsDeserializer (content)
-
Deserialization primitives for binary formats (abstract class).
"Binary" serialization formats may differ in the way they encode sequence lengths, variant index, and how they verify the ordering of keys in map entries (or not).
Expand source code
class BcsDeserializer(sb.BinaryDeserializer): def __init__(self, content): super().__init__(input=io.BytesIO(content), container_depth_budget=MAX_CONTAINER_DEPTH) def deserialize_uleb128_as_u32(self) -> int: value = 0 for shift in range(0, 32, 7): byte = int.from_bytes(self.read(1), "little", signed=False) digit = byte & 0x7F value |= digit << shift if value > MAX_U32: raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value") if digit == byte: if shift > 0 and digit == 0: raise st.DeserializationError("Invalid uleb128 number (unexpected zero digit)") return value raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value") def deserialize_len(self) -> int: value = self.deserialize_uleb128_as_u32() if value > MAX_LENGTH: raise st.DeserializationError("Length exceeds the maximum supported value.") return value def deserialize_variant_index(self) -> int: return self.deserialize_uleb128_as_u32() def check_that_key_slices_are_increasing(self, slice1: typing.Tuple[int, int], slice2: typing.Tuple[int, int]): key1 = bytes(self.input.getbuffer()[slice1[0] : slice1[1]]) key2 = bytes(self.input.getbuffer()[slice2[0] : slice2[1]]) if key1 >= key2: raise st.DeserializationError("Serialized keys in a map must be ordered by increasing lexicographic order")
Ancestors
Class variables
var container_depth_budget : Optional[int]
var input : _io.BytesIO
var primitive_type_deserializer : Mapping
Methods
def check_that_key_slices_are_increasing(self, slice1: Tuple[int, int], slice2: Tuple[int, int])
-
Expand source code
def check_that_key_slices_are_increasing(self, slice1: typing.Tuple[int, int], slice2: typing.Tuple[int, int]): key1 = bytes(self.input.getbuffer()[slice1[0] : slice1[1]]) key2 = bytes(self.input.getbuffer()[slice2[0] : slice2[1]]) if key1 >= key2: raise st.DeserializationError("Serialized keys in a map must be ordered by increasing lexicographic order")
def deserialize_len(self) ‑> int
-
Expand source code
def deserialize_len(self) -> int: value = self.deserialize_uleb128_as_u32() if value > MAX_LENGTH: raise st.DeserializationError("Length exceeds the maximum supported value.") return value
def deserialize_uleb128_as_u32(self) ‑> int
-
Expand source code
def deserialize_uleb128_as_u32(self) -> int: value = 0 for shift in range(0, 32, 7): byte = int.from_bytes(self.read(1), "little", signed=False) digit = byte & 0x7F value |= digit << shift if value > MAX_U32: raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value") if digit == byte: if shift > 0 and digit == 0: raise st.DeserializationError("Invalid uleb128 number (unexpected zero digit)") return value raise st.DeserializationError("Overflow while parsing uleb128-encoded uint32 value")
def deserialize_variant_index(self) ‑> int
-
Expand source code
def deserialize_variant_index(self) -> int: return self.deserialize_uleb128_as_u32()
class BcsSerializer
-
Serialization primitives for binary formats (abstract class).
"Binary" serialization formats may differ in the way they encode sequence lengths, variant index, and how they sort map entries (or not).
Expand source code
class BcsSerializer(sb.BinarySerializer): def __init__(self): super().__init__(output=io.BytesIO(), container_depth_budget=MAX_CONTAINER_DEPTH) def serialize_u32_as_uleb128(self, value: int): while value >= 0x80: b = (value & 0x7F) | 0x80 self.output.write(b.to_bytes(1, "little", signed=False)) value >>= 7 self.output.write(value.to_bytes(1, "little", signed=False)) def serialize_len(self, value: int): if value > MAX_LENGTH: raise st.SerializationError("Length exceeds the maximum supported value.") self.serialize_u32_as_uleb128(value) def serialize_variant_index(self, value: int): if value > MAX_U32: raise st.SerializationError("Variant index exceeds the maximum supported value.") self.serialize_u32_as_uleb128(value) def sort_map_entries(self, offsets: typing.List[int]): if len(offsets) < 1: return buf = self.output.getbuffer() offsets.append(len(buf)) slices = [] for i in range(1, len(offsets)): slices.append(bytes(buf[offsets[i - 1] : offsets[i]])) buf.release() slices.sort() self.output.seek(offsets[0]) for s in slices: self.output.write(s) assert offsets[-1] == len(self.output.getbuffer())
Ancestors
Class variables
var container_depth_budget : Optional[int]
var output : _io.BytesIO
var primitive_type_serializer : Mapping
Methods
def serialize_len(self, value: int)
-
Expand source code
def serialize_len(self, value: int): if value > MAX_LENGTH: raise st.SerializationError("Length exceeds the maximum supported value.") self.serialize_u32_as_uleb128(value)
def serialize_u32_as_uleb128(self, value: int)
-
Expand source code
def serialize_u32_as_uleb128(self, value: int): while value >= 0x80: b = (value & 0x7F) | 0x80 self.output.write(b.to_bytes(1, "little", signed=False)) value >>= 7 self.output.write(value.to_bytes(1, "little", signed=False))
def serialize_variant_index(self, value: int)
-
Expand source code
def serialize_variant_index(self, value: int): if value > MAX_U32: raise st.SerializationError("Variant index exceeds the maximum supported value.") self.serialize_u32_as_uleb128(value)
def sort_map_entries(self, offsets: List[int])
-
Expand source code
def sort_map_entries(self, offsets: typing.List[int]): if len(offsets) < 1: return buf = self.output.getbuffer() offsets.append(len(buf)) slices = [] for i in range(1, len(offsets)): slices.append(bytes(buf[offsets[i - 1] : offsets[i]])) buf.release() slices.sort() self.output.seek(offsets[0]) for s in slices: self.output.write(s) assert offsets[-1] == len(self.output.getbuffer())