Skip to main content

tempo_consensus/follow/
resolver.rs

1//! Resolver for follow mode.
2//!
3//! Implements [`Resolver`] for marshal's gap-repair machinery. Checks the
4//! local execution node first and falls back to the upstream abstraction.
5
6use std::{collections::BTreeMap, sync::Arc};
7
8use bytes::Bytes;
9use commonware_codec::{DecodeExt as _, Encode as _};
10use commonware_consensus::{
11    marshal::resolver::handler,
12    simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
13    types::Height,
14};
15use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
16use commonware_runtime::{ContextCell, Spawner, spawn_cell};
17use commonware_utils::{
18    channel::{fallible::FallibleExt as _, mpsc},
19    futures::{AbortablePool, Aborter},
20    vec::NonEmptyVec,
21};
22use eyre::Report;
23use reth_provider::{BlockReader as _, BlockSource};
24use tempo_node::TempoFullNode;
25use tokio::select;
26use tracing::{debug, error, instrument, warn};
27
28use crate::consensus::{Digest, block::Block};
29
30pub(crate) fn try_init<TContext>(
31    context: TContext,
32    config: Config,
33) -> (
34    Resolver<TContext>,
35    Mailbox,
36    mpsc::Receiver<handler::Message<Digest>>,
37) {
38    let (handler_tx, handler_rx) = mpsc::channel(config.mailbox_size);
39    let (mailbox_tx, mailbox_rx) = mpsc::unbounded_channel();
40    let actor = Resolver {
41        context: ContextCell::new(context),
42        config,
43        mailbox: mailbox_rx,
44        handler_tx,
45        requests: BTreeMap::new(),
46        fetches: AbortablePool::default(),
47    };
48    let mailbox = Mailbox { inner: mailbox_tx };
49    (actor, mailbox, handler_rx)
50}
51
52#[derive(Clone)]
53pub(crate) struct Mailbox {
54    // FIXME: This should probably not be an unbounded channel - but how do
55    // we exert backpressure?
56    inner: mpsc::UnboundedSender<Message>,
57}
58
59type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
60
61/// Messages sent to the resolver.
62enum Message {
63    /// Initiate fetch requests.
64    Fetch { keys: Vec<handler::Request<Digest>> },
65
66    /// Cancel a fetch request by key.
67    Cancel { key: handler::Request<Digest> },
68
69    /// Cancel all fetch requests.
70    Clear,
71
72    /// Cancel all fetch requests that do not satisfy the predicate.
73    Retain {
74        predicate: Predicate<handler::Request<Digest>>,
75    },
76}
77
78pub(crate) struct Config {
79    /// For reading blocks locally from the execution layer.
80    pub(super) execution_node: Arc<TempoFullNode>,
81    /// For reading blocks and certificates from the connected node.
82    pub(super) upstream: super::upstream::Mailbox,
83    pub(super) mailbox_size: usize,
84}
85
86type FetchPool = AbortablePool<(handler::Request<Digest>, Result<Bytes, bool>)>;
87pub(crate) struct Resolver<TContext> {
88    context: ContextCell<TContext>,
89    config: Config,
90    /// To send messages to the application/actor relying on the resolver.
91    handler_tx: mpsc::Sender<handler::Message<Digest>>,
92    mailbox: mpsc::UnboundedReceiver<Message>,
93    requests: BTreeMap<handler::Request<Digest>, Aborter>,
94    fetches: FetchPool,
95}
96
97impl<TContext> Resolver<TContext>
98where
99    TContext: Spawner,
100{
101    async fn run(mut self) {
102        loop {
103            select!(
104                biased;
105
106                response = self.fetches.next_completed() => {
107                    // Error case is aborting the future, no need to track.
108                    if let Ok(resolution) = response {
109                        self.handle_fetch_resolution(resolution);
110                    }
111                }
112
113                Some(msg) = self.mailbox.recv() => {
114                    match msg {
115                        Message::Fetch { keys, } => {
116                            self.handle_fetch_request(keys);
117                        }
118                        Message::Cancel { key } => {
119                            self.requests.remove(&key);
120                        }
121                        Message::Clear => {
122                            self.requests.clear();
123                        }
124                        Message::Retain { predicate } => {
125                            self.requests.retain(move |key, _| predicate(key));
126                        }
127                    }
128                }
129            )
130        }
131    }
132
133    pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
134        spawn_cell!(self.context, self.run())
135    }
136
137    #[instrument(skip_all)]
138    fn handle_fetch_request(&mut self, keys: Vec<handler::Request<Digest>>) {
139        for key in keys {
140            self.schedule_request(key);
141        }
142    }
143
144    #[instrument(skip_all)]
145    fn handle_fetch_resolution(
146        &mut self,
147        (key, resolution): (handler::Request<Digest>, Result<Bytes, bool>),
148    ) {
149        match resolution {
150            Ok(value) => {
151                debug!(%key, "fetched value, delivering to client");
152                self.requests.remove(&key);
153                // Fire and forget; there is no mechanism to retry
154                // sending the response.
155                let (response, _) = commonware_utils::channel::oneshot::channel();
156                let _ = self.handler_tx.try_send(handler::Message::Deliver {
157                    key,
158                    value,
159                    response,
160                });
161            }
162            Err(true) => {
163                debug!(%key, "fetch failed, rescheduling");
164                self.requests.remove(&key);
165                self.schedule_request(key);
166            }
167            Err(false) => {
168                debug!(%key, "fetch failed, dropping");
169                self.requests.remove(&key);
170            }
171        }
172    }
173
174    fn schedule_request(&mut self, key: handler::Request<Digest>) {
175        if !self.requests.contains_key(&key) {
176            let aborter = match &key {
177                handler::Request::Block(digest) => {
178                    let execution_node = self.config.execution_node.clone();
179                    let digest = *digest;
180                    let key = key.clone();
181                    self.fetches.push(async move {
182                        let response = resolve_block(&execution_node, digest);
183                        (key, response)
184                    })
185                }
186                handler::Request::Finalized { height } => {
187                    let upstream = self.config.upstream.clone();
188                    let height = *height;
189                    let key = key.clone();
190                    self.fetches.push(async move {
191                        let response = resolve_finalized_new(upstream, height).await;
192                        (key, response)
193                    })
194                }
195                handler::Request::Notarized { .. } => {
196                    debug!("ignoring requests for notarized blocks");
197                    return;
198                }
199            };
200            debug!(%key, "scheduled new request");
201            self.requests.insert(key, aborter);
202        } else {
203            debug!(%key, "request already scheduled");
204        }
205    }
206}
207
208/// Resolves an encoded block from the local execution layer.
209#[instrument(skip(execution_node))]
210fn resolve_block(execution_node: &TempoFullNode, block_digest: Digest) -> Result<Bytes, bool> {
211    let Ok(Some(block)) = execution_node
212        .provider
213        .find_sealed_or_recovered_block(block_digest.0, BlockSource::Any)
214        .map_err(Report::new)
215        .inspect_err(
216            |error| error!(%error, "unable to communicate with execution layer to lookup block"),
217        )
218    else {
219        return Err(false);
220    };
221    // Follow-mode recovery reads from the EL database, which persists only the block.
222    // BAL is p2p side data, so it is unavailable here.
223    let consensus_block = Block::from_execution_block_unchecked(block, None);
224    Ok(consensus_block.encode())
225}
226
227/// Resolves a request for a finalized.
228#[instrument(skip_all, fields(%height))]
229async fn resolve_finalized_new(
230    upstream: super::upstream::Mailbox,
231    height: Height,
232) -> Result<Bytes, bool> {
233    let certified_block = match upstream.get_finalization(height).await {
234        Some(certified_block) => certified_block,
235        None => return Err(false),
236    };
237
238    let Ok(finalization) = alloy_primitives::hex::decode(&certified_block.certificate)
239        .map_err(Report::new)
240        .and_then(|bytes| {
241            <Finalization<Scheme<PublicKey, MinSig>, Digest>>::decode(&*bytes).map_err(Report::new)
242        })
243        .inspect_err(|error| warn!(%error, "failed decoding certificate"))
244    else {
245        return Err(false);
246    };
247
248    // Upstream finalization responses carry persisted EL blocks only; no p2p BAL
249    // is available when reconstructing this consensus block.
250    let consensus_block = Block::from_execution_block_unchecked(certified_block.block, None);
251    Ok((finalization, consensus_block).encode())
252}
253
254impl commonware_resolver::Resolver for Mailbox {
255    type Key = handler::Request<Digest>;
256    type PublicKey = PublicKey;
257
258    async fn fetch(&mut self, key: Self::Key) {
259        self.fetch_all(vec![key]).await;
260    }
261
262    async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
263        self.inner.send_lossy(Message::Fetch { keys });
264    }
265
266    async fn fetch_targeted(&mut self, key: Self::Key, _targets: NonEmptyVec<Self::PublicKey>) {
267        self.fetch(key).await;
268    }
269
270    async fn fetch_all_targeted(
271        &mut self,
272        requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
273    ) {
274        self.fetch_all(requests.into_iter().map(|(k, _)| k).collect())
275            .await;
276    }
277
278    async fn cancel(&mut self, key: Self::Key) {
279        self.inner.send_lossy(Message::Cancel { key });
280    }
281
282    async fn clear(&mut self) {
283        self.inner.send_lossy(Message::Clear);
284    }
285
286    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
287        self.inner.send_lossy(Message::Retain {
288            predicate: Box::new(predicate),
289        });
290    }
291}