1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
use diem_types::trusted_state::TrustedState;
use std::{
convert::Infallible,
sync::{Arc, RwLock},
};
/// A `StateStore` provides persistent, durable storage for
/// [`VerifyingClient`](crate::VerifyingClient)s' latest [`TrustedState`].
///
/// ### Implementor Guarantees
///
/// Critically, the `StateStore` must provide certain properties in order to
/// uphold the client's safety guarantees. We say that a client is "safe" so long
/// as their observed durable states are monotonically increasing by version.
///
/// Stores should be atomic and durable. A state is considered durable after a
/// successful `store`. A client should never observe a stale state after a
/// crash or restart. Stores should be atomic, i.e., a crash or restart during
/// a store should not corrupt the existing stored state.
///
/// A `StateStore` only cares about storing the _latest_ [`TrustedState`]. Calls
/// to `StateStore::store` with a [`TrustedState`] older than the latest durable
/// state version must not affect the latest stored state and are safe to ignore.
///
/// At the minimum, a client should "Read-my-Writes". In other words, a client's
/// `state_store.store(s1)` followed by `s2 = state_store.latest_state()` should
/// satisfy `s1.version <= s2.version`.
///
/// ### Concurrency
///
/// A user may have multiple concurrent threads with `VerifyingClient`s attempting
/// to store new `TrustedState`s in the same `StateStore`, which should be supported
/// without compromising the above guarantees.
///
/// If a client process (with potentially many threads) is the sole reader and
/// writer, a `StateStore` can be wrapped in a `WriteThroughCache`, which removes
/// the need to read from the store and avoids some stale writes to the store.
/// The provided `InMemoryStateStore` and `FileStateStore` already do this.
///
/// ### Example
///
/// For instance, a SQLite-based `StateStore` might be implemented using queries
/// like:
///
/// ```sql
/// -- a table always containing the highest trusted_state
/// CREATE TABLE trusted_state (
/// id INTEGER NOT NULL PRIMARY KEY,
/// version INTEGER NOT NULL,
/// state_blob BLOB NOT NULL
/// );
///
/// -- StateStore::latest_state()
/// -- reading the latest state
/// SELECT id, version, state_blob FROM trusted_state WHERE id = 0;
///
/// -- StateStore::store(new_state: TrustedState)
/// -- maybe insert a new trusted_state, where ?1 is the new version and ?2 is the new
/// -- serialized state blob. ensures that the stored trusted state is always
/// -- the one with the greatest version.
/// INSERT OR REPLACE INTO trusted_state (id, version, state_blob)
/// SELECT id, version, state_blob FROM trusted_state UNION SELECT 0, ?1, ?2 ORDER BY version DESC LIMIT 1
/// ```
pub trait StateStore {
type Error: std::error::Error + Send + Sync + 'static;
/// Get the latest durable, committed state. Returns `None` if there is no
/// state yet in the underlying store.
fn latest_state(&self) -> Result<Option<TrustedState>, Self::Error>;
/// Get the version of the latest durable, committed state. Returns `None` if
/// there is no state yet in the underlying store.
///
/// Note: `StateStore` provides a default impl using `Self::latest_state`.
/// Implementors may want override this method if they can do it more efficiently.
fn latest_state_version(&self) -> Result<Option<u64>, Self::Error> {
Ok(self.latest_state()?.map(|s| s.version()))
}
/// Store a new [`TrustedState`] in this `StateStore`.
///
/// If there is already a newer, durable `TrustedState` in the store, we can
/// ignore `new_state`.
///
/// When this call returns, clients will assume that `new_state` or some newer
/// state is durable.
fn store(&self, new_state: &TrustedState) -> Result<(), Self::Error>;
}
/// An in-memory `StateStore`. Used for testing.
#[derive(Debug, Clone)]
pub struct InMemoryStateStore(Arc<WriteThroughCache<NoopStateStore>>);
/// This `StateStore` ignores all stores and always returns `Ok(None)`.
/// Used for testing.
#[derive(Debug)]
struct NoopStateStore;
/// A write-through cache around an underlying durable [`StateStore`].
///
/// As a write-through cache:
/// 1. Reads are serviced directly from the cache.
/// 2. Writes are committed to the underlying store before updating the cache.
#[derive(Debug)]
pub struct WriteThroughCache<S> {
durable_state_cache: RwLock<Option<TrustedState>>,
state_store: S,
}
///////////////////////
// WriteThroughCache //
///////////////////////
impl<S: StateStore> WriteThroughCache<S> {
pub fn new(state_store: S) -> Result<Self, S::Error> {
// Read the latest state from the underlying store to initialize the cache.
let latest_state = state_store.latest_state()?;
Ok(Self {
durable_state_cache: RwLock::new(latest_state),
state_store,
})
}
fn ratchet_cache(&self, new_state: &TrustedState) {
let mut durable_state_cache = self.durable_state_cache.write().unwrap();
let cache_version = durable_state_cache.as_ref().map(|s| s.version());
if Some(new_state.version()) > cache_version {
*durable_state_cache = Some(new_state.clone());
}
}
pub fn as_inner(&self) -> &S {
&self.state_store
}
}
impl<S: StateStore> StateStore for WriteThroughCache<S> {
type Error = S::Error;
fn latest_state(&self) -> Result<Option<TrustedState>, Self::Error> {
Ok(self.durable_state_cache.read().unwrap().clone())
}
fn latest_state_version(&self) -> Result<Option<u64>, Self::Error> {
// avoids a clone while holding the lock :)
Ok(self
.durable_state_cache
.read()
.unwrap()
.as_ref()
.map(|s| s.version()))
}
fn store(&self, new_state: &TrustedState) -> Result<(), Self::Error> {
// we already have a durable state that's newer than this version; we can
// exit early here since we don't need to do anything.
if Some(new_state.version()) <= self.latest_state_version()? {
return Ok(());
}
// store the new state and make it durable. we assume that the state is
// durable when this returns.
self.state_store.store(new_state)?;
// at this point, the state is finally durable, so we can safely update
// the durable state cache
self.ratchet_cache(new_state);
// the client now safely observes `new_state`
Ok(())
}
}
///////////////////
// InMemoryStore //
///////////////////
impl InMemoryStateStore {
pub fn new() -> Self {
Self(Arc::new(WriteThroughCache::new(NoopStateStore).unwrap()))
}
}
impl StateStore for InMemoryStateStore {
type Error = Infallible;
fn latest_state(&self) -> Result<Option<TrustedState>, Self::Error> {
self.0.latest_state()
}
fn latest_state_version(&self) -> Result<Option<u64>, Self::Error> {
self.0.latest_state_version()
}
fn store(&self, new_state: &TrustedState) -> Result<(), Self::Error> {
self.0.store(new_state)
}
}
impl StateStore for NoopStateStore {
type Error = Infallible;
fn latest_state(&self) -> Result<Option<TrustedState>, Self::Error> {
Ok(None)
}
fn store(&self, _new_state: &TrustedState) -> Result<(), Self::Error> {
Ok(())
}
}