Skip to main content

tempo_commonware_node/follow/
resolver.rs

1//! Resolver for follow mode.
2//!
3//! Implements [`Resolver`] for marshal's gap-repair machinery. Checks the
4//! local execution node first and falls back to the upstream abstraction.
5
6use std::{collections::BTreeMap, sync::Arc};
7
8use bytes::Bytes;
9use commonware_codec::{DecodeExt as _, Encode as _};
10use commonware_consensus::{
11    marshal::resolver::handler,
12    simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
13    types::Height,
14};
15use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
16use commonware_runtime::{ContextCell, Spawner, spawn_cell};
17use commonware_utils::{
18    channel::{fallible::FallibleExt as _, mpsc},
19    futures::{AbortablePool, Aborter},
20    vec::NonEmptyVec,
21};
22use eyre::Report;
23use reth_node_core::primitives::SealedBlock;
24use reth_provider::{BlockReader as _, BlockSource};
25use tempo_node::TempoFullNode;
26use tokio::select;
27use tracing::{debug, error, instrument, warn};
28
29use crate::consensus::{Digest, block::Block};
30
31pub(crate) fn try_init<TContext>(
32    context: TContext,
33    config: Config,
34) -> (
35    Resolver<TContext>,
36    Mailbox,
37    mpsc::Receiver<handler::Message<Digest>>,
38) {
39    let (handler_tx, handler_rx) = mpsc::channel(config.mailbox_size);
40    let (mailbox_tx, mailbox_rx) = mpsc::unbounded_channel();
41    let actor = Resolver {
42        context: ContextCell::new(context),
43        config,
44        mailbox: mailbox_rx,
45        handler_tx,
46        requests: BTreeMap::new(),
47        fetches: AbortablePool::default(),
48    };
49    let mailbox = Mailbox { inner: mailbox_tx };
50    (actor, mailbox, handler_rx)
51}
52
53#[derive(Clone)]
54pub(crate) struct Mailbox {
55    // FIXME: This should probably not be an unbounded channel - but how do
56    // we exert backpressure?
57    inner: mpsc::UnboundedSender<Message>,
58}
59
60type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
61
62/// Messages sent to the resolver.
63enum Message {
64    /// Initiate fetch requests.
65    Fetch { keys: Vec<handler::Request<Digest>> },
66
67    /// Cancel a fetch request by key.
68    Cancel { key: handler::Request<Digest> },
69
70    /// Cancel all fetch requests.
71    Clear,
72
73    /// Cancel all fetch requests that do not satisfy the predicate.
74    Retain {
75        predicate: Predicate<handler::Request<Digest>>,
76    },
77}
78
79pub(crate) struct Config {
80    /// For reading blocks locally from the execution layer.
81    pub(super) execution_node: Arc<TempoFullNode>,
82    /// For reading blocks and certificates from the connected node.
83    pub(super) upstream: super::upstream::Mailbox,
84    pub(super) mailbox_size: usize,
85}
86
87type FetchPool = AbortablePool<(handler::Request<Digest>, Result<Bytes, bool>)>;
88pub(crate) struct Resolver<TContext> {
89    context: ContextCell<TContext>,
90    config: Config,
91    /// To send messages to the application/actor relying on the resolver.
92    handler_tx: mpsc::Sender<handler::Message<Digest>>,
93    mailbox: mpsc::UnboundedReceiver<Message>,
94    requests: BTreeMap<handler::Request<Digest>, Aborter>,
95    fetches: FetchPool,
96}
97
98impl<TContext> Resolver<TContext>
99where
100    TContext: Spawner,
101{
102    async fn run(mut self) {
103        loop {
104            select!(
105                biased;
106
107                response = self.fetches.next_completed() => {
108                    // Error case is aborting the future, no need to track.
109                    if let Ok(resolution) = response {
110                        self.handle_fetch_resolution(resolution);
111                    }
112                }
113
114                Some(msg) = self.mailbox.recv() => {
115                    match msg {
116                        Message::Fetch { keys, } => {
117                            self.handle_fetch_request(keys);
118                        }
119                        Message::Cancel { key } => {
120                            self.requests.remove(&key);
121                        }
122                        Message::Clear => {
123                            self.requests.clear();
124                        }
125                        Message::Retain { predicate } => {
126                            self.requests.retain(move |key, _| predicate(key));
127                        }
128                    }
129                }
130            )
131        }
132    }
133
134    pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
135        spawn_cell!(self.context, self.run())
136    }
137
138    #[instrument(skip_all)]
139    fn handle_fetch_request(&mut self, keys: Vec<handler::Request<Digest>>) {
140        for key in keys {
141            self.schedule_request(key);
142        }
143    }
144
145    #[instrument(skip_all)]
146    fn handle_fetch_resolution(
147        &mut self,
148        (key, resolution): (handler::Request<Digest>, Result<Bytes, bool>),
149    ) {
150        match resolution {
151            Ok(value) => {
152                debug!(%key, "fetched value, delivering to client");
153                self.requests.remove(&key);
154                // Fire and forget; there is no mechanism to retry
155                // sending the response.
156                let (response, _) = commonware_utils::channel::oneshot::channel();
157                let _ = self.handler_tx.try_send(handler::Message::Deliver {
158                    key,
159                    value,
160                    response,
161                });
162            }
163            Err(true) => {
164                debug!(%key, "fetch failed, rescheduling");
165                self.requests.remove(&key);
166                self.schedule_request(key);
167            }
168            Err(false) => {
169                debug!(%key, "fetch failed, dropping");
170                self.requests.remove(&key);
171            }
172        }
173    }
174
175    fn schedule_request(&mut self, key: handler::Request<Digest>) {
176        if !self.requests.contains_key(&key) {
177            let aborter = match &key {
178                handler::Request::Block(digest) => {
179                    let execution_node = self.config.execution_node.clone();
180                    let digest = *digest;
181                    let key = key.clone();
182                    self.fetches.push(async move {
183                        let response = resolve_block(&execution_node, digest);
184                        (key, response)
185                    })
186                }
187                handler::Request::Finalized { height } => {
188                    let upstream = self.config.upstream.clone();
189                    let height = *height;
190                    let key = key.clone();
191                    self.fetches.push(async move {
192                        let response = resolve_finalized_new(upstream, height).await;
193                        (key, response)
194                    })
195                }
196                handler::Request::Notarized { .. } => {
197                    debug!("ignoring requests for notarized blocks");
198                    return;
199                }
200            };
201            debug!(%key, "scheduled new request");
202            self.requests.insert(key, aborter);
203        } else {
204            debug!(%key, "request already scheduled");
205        }
206    }
207}
208
209#[instrument(skip(execution_node))]
210fn resolve_block(execution_node: &TempoFullNode, block_digest: Digest) -> Result<Bytes, bool> {
211    let Ok(Some(block)) = execution_node
212        .provider
213        .find_block_by_hash(block_digest.0, BlockSource::Any)
214        .map_err(Report::new)
215        .inspect_err(
216            |error| error!(%error, "unable to communicate with execution layer to lookup block"),
217        )
218    else {
219        return Err(false);
220    };
221    let consensus_block = Block::from_execution_block(SealedBlock::seal_slow(block));
222    Ok(consensus_block.encode())
223}
224
225/// Resolves a request for a finalized.
226#[instrument(skip_all, fields(%height))]
227async fn resolve_finalized_new(
228    upstream: super::upstream::Mailbox,
229    height: Height,
230) -> Result<Bytes, bool> {
231    let certified_block = match upstream.get_finalization(height).await {
232        Some(certified_block) => certified_block,
233        None => return Err(false),
234    };
235
236    let Ok(finalization) = alloy_primitives::hex::decode(&certified_block.certificate)
237        .map_err(Report::new)
238        .and_then(|bytes| {
239            <Finalization<Scheme<PublicKey, MinSig>, Digest>>::decode(&*bytes).map_err(Report::new)
240        })
241        .inspect_err(|error| warn!(%error, "failed decoding certificate"))
242    else {
243        return Err(false);
244    };
245
246    let consensus_block =
247        Block::from_execution_block(SealedBlock::seal_slow(certified_block.block));
248    Ok((finalization, consensus_block).encode())
249}
250
251impl commonware_resolver::Resolver for Mailbox {
252    type Key = handler::Request<Digest>;
253    type PublicKey = PublicKey;
254
255    async fn fetch(&mut self, key: Self::Key) {
256        self.fetch_all(vec![key]).await;
257    }
258
259    async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
260        self.inner.send_lossy(Message::Fetch { keys });
261    }
262
263    async fn fetch_targeted(&mut self, key: Self::Key, _targets: NonEmptyVec<Self::PublicKey>) {
264        self.fetch(key).await;
265    }
266
267    async fn fetch_all_targeted(
268        &mut self,
269        requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
270    ) {
271        self.fetch_all(requests.into_iter().map(|(k, _)| k).collect())
272            .await;
273    }
274
275    async fn cancel(&mut self, key: Self::Key) {
276        self.inner.send_lossy(Message::Cancel { key });
277    }
278
279    async fn clear(&mut self) {
280        self.inner.send_lossy(Message::Clear);
281    }
282
283    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
284        self.inner.send_lossy(Message::Retain {
285            predicate: Box::new(predicate),
286        });
287    }
288}