1use std::{collections::BTreeMap, sync::Arc};
7
8use bytes::Bytes;
9use commonware_codec::{DecodeExt as _, Encode as _};
10use commonware_consensus::{
11 marshal::resolver::handler,
12 simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
13 types::Height,
14};
15use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
16use commonware_runtime::{ContextCell, Spawner, spawn_cell};
17use commonware_utils::{
18 channel::{fallible::FallibleExt as _, mpsc},
19 futures::{AbortablePool, Aborter},
20 vec::NonEmptyVec,
21};
22use eyre::Report;
23use reth_provider::{BlockReader as _, BlockSource};
24use tempo_node::TempoFullNode;
25use tokio::select;
26use tracing::{debug, error, instrument, warn};
27
28use crate::consensus::{Digest, block::Block};
29
30pub(crate) fn try_init<TContext>(
31 context: TContext,
32 config: Config,
33) -> (
34 Resolver<TContext>,
35 Mailbox,
36 mpsc::Receiver<handler::Message<Digest>>,
37) {
38 let (handler_tx, handler_rx) = mpsc::channel(config.mailbox_size);
39 let (mailbox_tx, mailbox_rx) = mpsc::unbounded_channel();
40 let actor = Resolver {
41 context: ContextCell::new(context),
42 config,
43 mailbox: mailbox_rx,
44 handler_tx,
45 requests: BTreeMap::new(),
46 fetches: AbortablePool::default(),
47 };
48 let mailbox = Mailbox { inner: mailbox_tx };
49 (actor, mailbox, handler_rx)
50}
51
52#[derive(Clone)]
53pub(crate) struct Mailbox {
54 inner: mpsc::UnboundedSender<Message>,
57}
58
59type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
60
61enum Message {
63 Fetch { keys: Vec<handler::Request<Digest>> },
65
66 Cancel { key: handler::Request<Digest> },
68
69 Clear,
71
72 Retain {
74 predicate: Predicate<handler::Request<Digest>>,
75 },
76}
77
78pub(crate) struct Config {
79 pub(super) execution_node: Arc<TempoFullNode>,
81 pub(super) upstream: super::upstream::Mailbox,
83 pub(super) mailbox_size: usize,
84}
85
86type FetchPool = AbortablePool<(handler::Request<Digest>, Result<Bytes, bool>)>;
87pub(crate) struct Resolver<TContext> {
88 context: ContextCell<TContext>,
89 config: Config,
90 handler_tx: mpsc::Sender<handler::Message<Digest>>,
92 mailbox: mpsc::UnboundedReceiver<Message>,
93 requests: BTreeMap<handler::Request<Digest>, Aborter>,
94 fetches: FetchPool,
95}
96
97impl<TContext> Resolver<TContext>
98where
99 TContext: Spawner,
100{
101 async fn run(mut self) {
102 loop {
103 select!(
104 biased;
105
106 response = self.fetches.next_completed() => {
107 if let Ok(resolution) = response {
109 self.handle_fetch_resolution(resolution);
110 }
111 }
112
113 Some(msg) = self.mailbox.recv() => {
114 match msg {
115 Message::Fetch { keys, } => {
116 self.handle_fetch_request(keys);
117 }
118 Message::Cancel { key } => {
119 self.requests.remove(&key);
120 }
121 Message::Clear => {
122 self.requests.clear();
123 }
124 Message::Retain { predicate } => {
125 self.requests.retain(move |key, _| predicate(key));
126 }
127 }
128 }
129 )
130 }
131 }
132
133 pub(crate) fn start(mut self) -> commonware_runtime::Handle<()> {
134 spawn_cell!(self.context, self.run())
135 }
136
137 #[instrument(skip_all)]
138 fn handle_fetch_request(&mut self, keys: Vec<handler::Request<Digest>>) {
139 for key in keys {
140 self.schedule_request(key);
141 }
142 }
143
144 #[instrument(skip_all)]
145 fn handle_fetch_resolution(
146 &mut self,
147 (key, resolution): (handler::Request<Digest>, Result<Bytes, bool>),
148 ) {
149 match resolution {
150 Ok(value) => {
151 debug!(%key, "fetched value, delivering to client");
152 self.requests.remove(&key);
153 let (response, _) = commonware_utils::channel::oneshot::channel();
156 let _ = self.handler_tx.try_send(handler::Message::Deliver {
157 key,
158 value,
159 response,
160 });
161 }
162 Err(true) => {
163 debug!(%key, "fetch failed, rescheduling");
164 self.requests.remove(&key);
165 self.schedule_request(key);
166 }
167 Err(false) => {
168 debug!(%key, "fetch failed, dropping");
169 self.requests.remove(&key);
170 }
171 }
172 }
173
174 fn schedule_request(&mut self, key: handler::Request<Digest>) {
175 if !self.requests.contains_key(&key) {
176 let aborter = match &key {
177 handler::Request::Block(digest) => {
178 let execution_node = self.config.execution_node.clone();
179 let digest = *digest;
180 let key = key.clone();
181 self.fetches.push(async move {
182 let response = resolve_block(&execution_node, digest);
183 (key, response)
184 })
185 }
186 handler::Request::Finalized { height } => {
187 let upstream = self.config.upstream.clone();
188 let height = *height;
189 let key = key.clone();
190 self.fetches.push(async move {
191 let response = resolve_finalized_new(upstream, height).await;
192 (key, response)
193 })
194 }
195 handler::Request::Notarized { .. } => {
196 debug!("ignoring requests for notarized blocks");
197 return;
198 }
199 };
200 debug!(%key, "scheduled new request");
201 self.requests.insert(key, aborter);
202 } else {
203 debug!(%key, "request already scheduled");
204 }
205 }
206}
207
208#[instrument(skip(execution_node))]
210fn resolve_block(execution_node: &TempoFullNode, block_digest: Digest) -> Result<Bytes, bool> {
211 let Ok(Some(block)) = execution_node
212 .provider
213 .find_sealed_or_recovered_block(block_digest.0, BlockSource::Any)
214 .map_err(Report::new)
215 .inspect_err(
216 |error| error!(%error, "unable to communicate with execution layer to lookup block"),
217 )
218 else {
219 return Err(false);
220 };
221 let consensus_block = Block::from_execution_block_unchecked(block, None);
224 Ok(consensus_block.encode())
225}
226
227#[instrument(skip_all, fields(%height))]
229async fn resolve_finalized_new(
230 upstream: super::upstream::Mailbox,
231 height: Height,
232) -> Result<Bytes, bool> {
233 let certified_block = match upstream.get_finalization(height).await {
234 Some(certified_block) => certified_block,
235 None => return Err(false),
236 };
237
238 let Ok(finalization) = alloy_primitives::hex::decode(&certified_block.certificate)
239 .map_err(Report::new)
240 .and_then(|bytes| {
241 <Finalization<Scheme<PublicKey, MinSig>, Digest>>::decode(&*bytes).map_err(Report::new)
242 })
243 .inspect_err(|error| warn!(%error, "failed decoding certificate"))
244 else {
245 return Err(false);
246 };
247
248 let consensus_block = Block::from_execution_block_unchecked(certified_block.block, None);
251 Ok((finalization, consensus_block).encode())
252}
253
254impl commonware_resolver::Resolver for Mailbox {
255 type Key = handler::Request<Digest>;
256 type PublicKey = PublicKey;
257
258 async fn fetch(&mut self, key: Self::Key) {
259 self.fetch_all(vec![key]).await;
260 }
261
262 async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
263 self.inner.send_lossy(Message::Fetch { keys });
264 }
265
266 async fn fetch_targeted(&mut self, key: Self::Key, _targets: NonEmptyVec<Self::PublicKey>) {
267 self.fetch(key).await;
268 }
269
270 async fn fetch_all_targeted(
271 &mut self,
272 requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
273 ) {
274 self.fetch_all(requests.into_iter().map(|(k, _)| k).collect())
275 .await;
276 }
277
278 async fn cancel(&mut self, key: Self::Key) {
279 self.inner.send_lossy(Message::Cancel { key });
280 }
281
282 async fn clear(&mut self) {
283 self.inner.send_lossy(Message::Clear);
284 }
285
286 async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
287 self.inner.send_lossy(Message::Retain {
288 predicate: Box::new(predicate),
289 });
290 }
291}