tempo_commonware_node/follow/
resolver.rs1use std::{collections::BTreeMap, sync::Arc};
7
8use bytes::Bytes;
9use commonware_codec::{DecodeExt as _, Encode as _};
10use commonware_consensus::{
11 marshal::resolver::handler,
12 simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
13 types::Height,
14};
15use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
16use commonware_runtime::{ContextCell, Spawner, spawn_cell};
17use commonware_utils::{
18 channel::{fallible::FallibleExt as _, mpsc},
19 futures::{AbortablePool, Aborter},
20 vec::NonEmptyVec,
21};
22use eyre::Report;
23use reth_node_core::primitives::SealedBlock;
24use reth_provider::{BlockReader as _, BlockSource};
25use tempo_node::TempoFullNode;
26use tokio::select;
27use tracing::{debug, error, instrument, warn};
28
29use crate::consensus::{Digest, block::Block};
30
31pub(crate) fn try_init<TContext>(
32 context: TContext,
33 config: Config,
34) -> (
35 Resolver<TContext>,
36 Mailbox,
37 mpsc::Receiver<handler::Message<Digest>>,
38) {
39 let (handler_tx, handler_rx) = mpsc::channel(config.mailbox_size);
40 let (mailbox_tx, mailbox_rx) = mpsc::unbounded_channel();
41 let actor = Resolver {
42 context: ContextCell::new(context),
43 config,
44 mailbox: mailbox_rx,
45 handler_tx,
46 requests: BTreeMap::new(),
47 fetches: AbortablePool::default(),
48 };
49 let mailbox = Mailbox { inner: mailbox_tx };
50 (actor, mailbox, handler_rx)
51}
52
53#[derive(Clone)]
54pub(crate) struct Mailbox {
55 inner: mpsc::UnboundedSender<Message>,
58}
59
60type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
61
62enum Message {
64 Fetch { keys: Vec<handler::Request<Digest>> },
66
67 Cancel { key: handler::Request<Digest> },
69
70 Clear,
72
73 Retain {
75 predicate: Predicate<handler::Request<Digest>>,
76 },
77}
78
79pub(crate) struct Config {
80 pub(super) execution_node: Arc<TempoFullNode>,
82 pub(super) upstream: super::upstream::Mailbox,
84 pub(super) mailbox_size: usize,
85}
86
87type FetchPool = AbortablePool<(handler::Request<Digest>, Result<Bytes, bool>)>;
88pub(crate) struct Resolver<TContext> {
89 context: ContextCell<TContext>,
90 config: Config,
91 handler_tx: mpsc::Sender<handler::Message<Digest>>,
93 mailbox: mpsc::UnboundedReceiver<Message>,
94 requests: BTreeMap<handler::Request<Digest>, Aborter>,
95 fetches: FetchPool,
96}
97
98impl<TContext> Resolver<TContext>
99where
100 TContext: Spawner,
101{
102 async fn run(mut self) {
103 loop {
104 select!(
105 biased;
106
107 response = self.fetches.next_completed() => {
108 if let Ok(resolution) = response {
110 self.handle_fetch_resolution(resolution);
111 }
112 }
113
114 Some(msg) = self.mailbox.recv() => {
115 match msg {
116 Message::Fetch { keys, } => {
117 self.handle_fetch_request(keys);
118 }
119 Message::Cancel { key } => {
120 self.requests.remove(&key);
121 }
122 Message::Clear => {
123 self.requests.clear();
124 }
125 Message::Retain { predicate } => {
126 self.requests.retain(move |key, _| predicate(key));
127 }
128 }
129 }
130 )
131 }
132 }
133
134 pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
135 spawn_cell!(self.context, self.run())
136 }
137
138 #[instrument(skip_all)]
139 fn handle_fetch_request(&mut self, keys: Vec<handler::Request<Digest>>) {
140 for key in keys {
141 self.schedule_request(key);
142 }
143 }
144
145 #[instrument(skip_all)]
146 fn handle_fetch_resolution(
147 &mut self,
148 (key, resolution): (handler::Request<Digest>, Result<Bytes, bool>),
149 ) {
150 match resolution {
151 Ok(value) => {
152 debug!(%key, "fetched value, delivering to client");
153 self.requests.remove(&key);
154 let (response, _) = commonware_utils::channel::oneshot::channel();
157 let _ = self.handler_tx.try_send(handler::Message::Deliver {
158 key,
159 value,
160 response,
161 });
162 }
163 Err(true) => {
164 debug!(%key, "fetch failed, rescheduling");
165 self.requests.remove(&key);
166 self.schedule_request(key);
167 }
168 Err(false) => {
169 debug!(%key, "fetch failed, dropping");
170 self.requests.remove(&key);
171 }
172 }
173 }
174
175 fn schedule_request(&mut self, key: handler::Request<Digest>) {
176 if !self.requests.contains_key(&key) {
177 let aborter = match &key {
178 handler::Request::Block(digest) => {
179 let execution_node = self.config.execution_node.clone();
180 let digest = *digest;
181 let key = key.clone();
182 self.fetches.push(async move {
183 let response = resolve_block(&execution_node, digest);
184 (key, response)
185 })
186 }
187 handler::Request::Finalized { height } => {
188 let upstream = self.config.upstream.clone();
189 let height = *height;
190 let key = key.clone();
191 self.fetches.push(async move {
192 let response = resolve_finalized_new(upstream, height).await;
193 (key, response)
194 })
195 }
196 handler::Request::Notarized { .. } => {
197 debug!("ignoring requests for notarized blocks");
198 return;
199 }
200 };
201 debug!(%key, "scheduled new request");
202 self.requests.insert(key, aborter);
203 } else {
204 debug!(%key, "request already scheduled");
205 }
206 }
207}
208
209#[instrument(skip(execution_node))]
210fn resolve_block(execution_node: &TempoFullNode, block_digest: Digest) -> Result<Bytes, bool> {
211 let Ok(Some(block)) = execution_node
212 .provider
213 .find_block_by_hash(block_digest.0, BlockSource::Any)
214 .map_err(Report::new)
215 .inspect_err(
216 |error| error!(%error, "unable to communicate with execution layer to lookup block"),
217 )
218 else {
219 return Err(false);
220 };
221 let consensus_block = Block::from_execution_block(SealedBlock::seal_slow(block));
222 Ok(consensus_block.encode())
223}
224
225#[instrument(skip_all, fields(%height))]
227async fn resolve_finalized_new(
228 upstream: super::upstream::Mailbox,
229 height: Height,
230) -> Result<Bytes, bool> {
231 let certified_block = match upstream.get_finalization(height).await {
232 Some(certified_block) => certified_block,
233 None => return Err(false),
234 };
235
236 let Ok(finalization) = alloy_primitives::hex::decode(&certified_block.certificate)
237 .map_err(Report::new)
238 .and_then(|bytes| {
239 <Finalization<Scheme<PublicKey, MinSig>, Digest>>::decode(&*bytes).map_err(Report::new)
240 })
241 .inspect_err(|error| warn!(%error, "failed decoding certificate"))
242 else {
243 return Err(false);
244 };
245
246 let consensus_block =
247 Block::from_execution_block(SealedBlock::seal_slow(certified_block.block));
248 Ok((finalization, consensus_block).encode())
249}
250
251impl commonware_resolver::Resolver for Mailbox {
252 type Key = handler::Request<Digest>;
253 type PublicKey = PublicKey;
254
255 async fn fetch(&mut self, key: Self::Key) {
256 self.fetch_all(vec![key]).await;
257 }
258
259 async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
260 self.inner.send_lossy(Message::Fetch { keys });
261 }
262
263 async fn fetch_targeted(&mut self, key: Self::Key, _targets: NonEmptyVec<Self::PublicKey>) {
264 self.fetch(key).await;
265 }
266
267 async fn fetch_all_targeted(
268 &mut self,
269 requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
270 ) {
271 self.fetch_all(requests.into_iter().map(|(k, _)| k).collect())
272 .await;
273 }
274
275 async fn cancel(&mut self, key: Self::Key) {
276 self.inner.send_lossy(Message::Cancel { key });
277 }
278
279 async fn clear(&mut self) {
280 self.inner.send_lossy(Message::Clear);
281 }
282
283 async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
284 self.inner.send_lossy(Message::Retain {
285 predicate: Box::new(predicate),
286 });
287 }
288}