1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crossbeam_queue::SegQueue;
use mvhashmap::Version;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc, RwLock,
};

#[repr(usize)]
enum ExecutionStatus {
    Executed = 1,
    NotExecuted = 0,
}

pub struct Scheduler {
    // Shared index (version) of the next txn to be executed from the original transaction sequence.
    execution_marker: AtomicUsize,
    // Shared number of txns to execute: updated before executing a block or when an error or
    // reconfiguration leads to early stopping (at that transaction version).
    stop_at_version: AtomicUsize,

    txn_buffer: SegQueue<usize>, // shared queue of list of dependency-resolved transactions.
    // TODO: Do we need padding here?
    txn_dependency: Vec<Arc<RwLock<Vec<usize>>>>, // version -> txns that depend on it.
    txn_status: Vec<AtomicUsize>,                 // version -> execution status.
}

impl Scheduler {
    pub fn new(num_txns: usize) -> Self {
        Self {
            execution_marker: AtomicUsize::new(0),
            stop_at_version: AtomicUsize::new(num_txns),
            txn_buffer: SegQueue::new(),
            txn_dependency: (0..num_txns)
                .map(|_| Arc::new(RwLock::new(Vec::new())))
                .collect(),
            txn_status: (0..num_txns)
                .map(|_| AtomicUsize::new(ExecutionStatus::NotExecuted as usize))
                .collect(),
        }
    }

    // Return the next txn id for the thread to execute: first fetch from the shared queue that
    // stores dependency-resolved txns, then fetch from the original ordered txn sequence.
    // Return Some(id) if found the next transaction, else return None.
    pub fn next_txn_to_execute(&self) -> Option<Version> {
        // Fetch txn from txn_buffer
        match self.txn_buffer.pop() {
            Some(version) => Some(version),
            None => {
                // Fetch the first non-executed txn from the original transaction list
                let next_to_execute = self.execution_marker.fetch_add(1, Ordering::Relaxed);
                if next_to_execute < self.num_txn_to_execute() {
                    Some(next_to_execute)
                } else {
                    // Everything executed at least once - validation will take care of rest.
                    None
                }
            }
        }
    }

    // Invoked when txn depends on another txn, adds version to the dependency list the other txn.
    // Return true if successful, otherwise dependency resolved in the meantime, return false.
    pub fn add_dependency(&self, version: Version, dep_version: Version) -> bool {
        // Could pre-check that the txn isn't in executed state, but shouldn't matter much since
        // the caller usually has just observed the read dependency (so not executed state).

        // txn_dependency is initialized for all versions, so unwrap() is safe.
        let mut stored_deps = self.txn_dependency[dep_version].write().unwrap();
        if self.txn_status[dep_version].load(Ordering::Acquire)
            != ExecutionStatus::Executed as usize
        {
            stored_deps.push(version);
            return true;
        }
        false
    }

    // After txn is executed, add its dependencies to the shared buffer for execution.
    pub fn finish_execution(&self, version: Version) {
        self.txn_status[version].store(ExecutionStatus::Executed as usize, Ordering::Release);
        let mut version_deps: Vec<usize> = {
            // we want to make things fast inside the lock, so use take instead of clone
            let mut stored_deps = self.txn_dependency[version].write().unwrap();
            std::mem::take(&mut stored_deps)
        };

        version_deps.sort_unstable();
        for dep in version_deps {
            self.txn_buffer.push(dep);
        }
    }

    // Reset the txn version/id to end execution earlier. The executor will stop at the smallest
    // `stop_version` when there are multiple concurrent invocation.
    pub fn set_stop_version(&self, stop_version: Version) {
        self.stop_at_version
            .fetch_min(stop_version, Ordering::Relaxed);
    }

    // Adding version to the ready queue.
    pub fn add_transaction(&self, version: Version) {
        self.txn_buffer.push(version)
    }

    // Get the last txn version/id
    pub fn num_txn_to_execute(&self) -> Version {
        self.stop_at_version.load(Ordering::Relaxed)
    }
}