Module diem.testing.miniwallet.app.store
Expand source code
# Copyright (c) The Diem Core Contributors
# SPDX-License-Identifier: Apache-2.0
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Type, Any, Callable, TypeVar, Generator
from .models import Base, Account, Event
from .... import utils
import json, time, threading
T = TypeVar("T", bound=Base)
class NotFoundError(ValueError):
pass
@dataclass
class InMemoryStore:
"""InMemoryStore is a simple in-memory store for resources"""
resources: Dict[Type[Base], List[Dict[str, Any]]] = field(default_factory=dict)
resources_lock: threading.RLock = field(default_factory=threading.RLock)
gen_id_lock: threading.Lock = field(default_factory=threading.Lock)
gen_id: int = field(default=0)
def next_id(self) -> int:
with self.gen_id_lock:
self.gen_id += 1
return self.gen_id
def create_event(self, account_id: str, type: str, data: str) -> Event:
return self.create(Event, account_id=account_id, type=type, data=data, timestamp=_ts())
def find(self, typ: Type[T], **conds: Any) -> T:
with self.resources_lock:
list = self._select(typ, **conds)
ret = next(list, None)
if not ret:
raise NotFoundError("%s not found by %s" % (typ.__name__, conds))
if next(list, None):
raise ValueError("found multiple resources data matches %s" % conds)
return ret
def find_all(self, typ: Type[T], **conds: Any) -> List[T]:
with self.resources_lock:
return list(self._select(typ, **conds))
def create(self, typ: Type[T], before_create: Callable[[Dict[str, Any]], None] = lambda _: _, **data: Any) -> T:
with self.resources_lock:
before_create(data)
obj = typ(**self._insert(typ, **data))
self._record_event(obj, "created", data)
return obj
def update(self, obj: T, before_update: Callable[[T], None] = lambda _: _, **data: Any) -> None:
for k, v in data.items():
setattr(obj, k, v)
with self.resources_lock:
before_update(obj)
self._update(obj)
self._record_event(obj, "updated", data)
def _record_event(self, obj: T, action: str, data: Dict[str, Any]) -> None:
if not isinstance(obj, Event):
type = "%s_%s" % (action, utils.to_snake(obj))
account_id = obj.id if isinstance(obj, Account) else obj.account_id # pyre-ignore
data["id"] = obj.id
self._insert(Event, account_id=account_id, type=type, data=json.dumps(data), timestamp=_ts())
def _update(self, obj: T) -> None:
records = self.resources.get(type(obj), [])
index = next(iter([i for i, res in enumerate(records) if res["id"] == obj.id]), None)
if index is None:
raise NotFoundError("could not find resource by id: %s" % obj.id)
records[index] = asdict(obj)
def _insert(self, typ: Type[T], **res: Any) -> Dict[str, Any]:
if "id" not in res:
res["id"] = str(self.next_id())
self.resources.setdefault(typ, []).append(asdict(typ(**res)))
return res
def _select(self, typ: Type[T], reverse: bool = False, **conds: Any) -> Generator[T, None, None]:
items = reversed(self.resources.get(typ, [])) if reverse else self.resources.get(typ, [])
for res in items:
if _match(res, **conds):
yield typ(**res)
def _match(res: Dict[str, Any], **conds: Any) -> bool:
for k, v in conds.items():
if res.get(k) != v:
return False
return True
def _ts() -> int:
return int(time.time() * 1000)
Classes
class InMemoryStore (resources: Dict[Type[Base], List[Dict[str, Any]]] = <factory>, resources_lock:
= <factory>, gen_id_lock: = <factory>, gen_id: int = 0) -
InMemoryStore is a simple in-memory store for resources
Expand source code
@dataclass class InMemoryStore: """InMemoryStore is a simple in-memory store for resources""" resources: Dict[Type[Base], List[Dict[str, Any]]] = field(default_factory=dict) resources_lock: threading.RLock = field(default_factory=threading.RLock) gen_id_lock: threading.Lock = field(default_factory=threading.Lock) gen_id: int = field(default=0) def next_id(self) -> int: with self.gen_id_lock: self.gen_id += 1 return self.gen_id def create_event(self, account_id: str, type: str, data: str) -> Event: return self.create(Event, account_id=account_id, type=type, data=data, timestamp=_ts()) def find(self, typ: Type[T], **conds: Any) -> T: with self.resources_lock: list = self._select(typ, **conds) ret = next(list, None) if not ret: raise NotFoundError("%s not found by %s" % (typ.__name__, conds)) if next(list, None): raise ValueError("found multiple resources data matches %s" % conds) return ret def find_all(self, typ: Type[T], **conds: Any) -> List[T]: with self.resources_lock: return list(self._select(typ, **conds)) def create(self, typ: Type[T], before_create: Callable[[Dict[str, Any]], None] = lambda _: _, **data: Any) -> T: with self.resources_lock: before_create(data) obj = typ(**self._insert(typ, **data)) self._record_event(obj, "created", data) return obj def update(self, obj: T, before_update: Callable[[T], None] = lambda _: _, **data: Any) -> None: for k, v in data.items(): setattr(obj, k, v) with self.resources_lock: before_update(obj) self._update(obj) self._record_event(obj, "updated", data) def _record_event(self, obj: T, action: str, data: Dict[str, Any]) -> None: if not isinstance(obj, Event): type = "%s_%s" % (action, utils.to_snake(obj)) account_id = obj.id if isinstance(obj, Account) else obj.account_id # pyre-ignore data["id"] = obj.id self._insert(Event, account_id=account_id, type=type, data=json.dumps(data), timestamp=_ts()) def _update(self, obj: T) -> None: records = self.resources.get(type(obj), []) index = next(iter([i for i, res in enumerate(records) if res["id"] == obj.id]), None) if index is None: raise NotFoundError("could not find resource by id: %s" % obj.id) records[index] = asdict(obj) def _insert(self, typ: Type[T], **res: Any) -> Dict[str, Any]: if "id" not in res: res["id"] = str(self.next_id()) self.resources.setdefault(typ, []).append(asdict(typ(**res))) return res def _select(self, typ: Type[T], reverse: bool = False, **conds: Any) -> Generator[T, None, None]: items = reversed(self.resources.get(typ, [])) if reverse else self.resources.get(typ, []) for res in items: if _match(res, **conds): yield typ(**res)
Class variables
var gen_id : int
var gen_id_lock :
var resources : Dict[Type[Base], List[Dict[str, Any]]]
var resources_lock :
Methods
def create(self, typ: Type[~T], before_create: Callable[[Dict[str, Any]], NoneType] = <function InMemoryStore.<lambda>>, **data: Any) ‑> ~T
-
Expand source code
def create(self, typ: Type[T], before_create: Callable[[Dict[str, Any]], None] = lambda _: _, **data: Any) -> T: with self.resources_lock: before_create(data) obj = typ(**self._insert(typ, **data)) self._record_event(obj, "created", data) return obj
def create_event(self, account_id: str, type: str, data: str) ‑> Event
-
Expand source code
def create_event(self, account_id: str, type: str, data: str) -> Event: return self.create(Event, account_id=account_id, type=type, data=data, timestamp=_ts())
def find(self, typ: Type[~T], **conds: Any) ‑> ~T
-
Expand source code
def find(self, typ: Type[T], **conds: Any) -> T: with self.resources_lock: list = self._select(typ, **conds) ret = next(list, None) if not ret: raise NotFoundError("%s not found by %s" % (typ.__name__, conds)) if next(list, None): raise ValueError("found multiple resources data matches %s" % conds) return ret
def find_all(self, typ: Type[~T], **conds: Any) ‑> List[~T]
-
Expand source code
def find_all(self, typ: Type[T], **conds: Any) -> List[T]: with self.resources_lock: return list(self._select(typ, **conds))
def next_id(self) ‑> int
-
Expand source code
def next_id(self) -> int: with self.gen_id_lock: self.gen_id += 1 return self.gen_id
def update(self, obj: ~T, before_update: Callable[[~T], NoneType] = <function InMemoryStore.<lambda>>, **data: Any) ‑> NoneType
-
Expand source code
def update(self, obj: T, before_update: Callable[[T], None] = lambda _: _, **data: Any) -> None: for k, v in data.items(): setattr(obj, k, v) with self.resources_lock: before_update(obj) self._update(obj) self._record_event(obj, "updated", data)
class NotFoundError (*args, **kwargs)
-
Inappropriate argument value (of correct type).
Expand source code
class NotFoundError(ValueError): pass
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException