tempo_commonware_node/feed/
state.rs1use crate::alias::marshal;
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8 marshal::Identifier,
9 types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use commonware_cryptography::bls12381::primitives::variant::{MinSig, Variant};
12use parking_lot::RwLock;
13use reth_node_core::rpc::compat::FromConsensusHeader;
14use reth_provider::HeaderProvider as _;
15use std::sync::{Arc, OnceLock};
16use tempo_alloy::rpc::TempoHeaderResponse;
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::{
19 TempoFullNode,
20 rpc::consensus::{
21 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
22 IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
23 types::Response,
24 },
25};
26use tokio::sync::broadcast;
27use tracing::{Level, instrument};
28
29const BROADCAST_CHANNEL_SIZE: usize = 1024;
30
31pub(super) struct FeedState {
33 pub(super) latest_notarized: Option<CertifiedBlock>,
35 pub(super) latest_finalized: Option<CertifiedBlock>,
37}
38
39#[derive(Clone)]
44struct IdentityTransitionCache {
45 from_epoch: u64,
47 from_pubkey: <MinSig as Variant>::Public,
49 to_epoch: u64,
51 to_pubkey: <MinSig as Variant>::Public,
53 transitions: Arc<Vec<IdentityTransition>>,
55}
56
57#[derive(Clone)]
63pub struct FeedStateHandle {
64 state: Arc<RwLock<FeedState>>,
65 marshal: Arc<OnceLock<marshal::Mailbox>>,
66 epocher: Arc<OnceLock<FixedEpocher>>,
67 execution_node: Arc<OnceLock<Arc<TempoFullNode>>>,
68 events_tx: broadcast::Sender<Event>,
69 identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
71}
72
73impl FeedStateHandle {
74 pub fn new() -> Self {
79 let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
80 Self {
81 state: Arc::new(RwLock::new(FeedState {
82 latest_notarized: None,
83 latest_finalized: None,
84 })),
85 marshal: Arc::new(OnceLock::new()),
86 epocher: Arc::new(OnceLock::new()),
87 execution_node: Arc::new(OnceLock::new()),
88 events_tx,
89 identity_cache: Arc::new(RwLock::new(None)),
90 }
91 }
92
93 pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
95 let _ = self.marshal.set(marshal);
96 }
97
98 pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
100 let _ = self.epocher.set(epocher);
101 }
102
103 pub(crate) fn set_execution_node(&self, execution_node: Arc<TempoFullNode>) {
105 let _ = self.execution_node.set(execution_node);
106 }
107
108 pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
110 &self.events_tx
111 }
112
113 pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
115 self.state.read()
116 }
117
118 pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
120 self.state.write()
121 }
122
123 fn marshal(&self) -> Option<marshal::Mailbox> {
125 let marshal = self.marshal.get().cloned();
126 if marshal.is_none() {
127 tracing::debug!("marshal not yet set");
128 }
129 marshal
130 }
131
132 fn epocher(&self) -> Option<FixedEpocher> {
134 let epocher = self.epocher.get().cloned();
135 if epocher.is_none() {
136 tracing::debug!("epocher not yet set");
137 }
138 epocher
139 }
140
141 #[instrument(skip_all, fields(start_epoch), err)]
145 async fn try_fill_transitions(
146 &self,
147 marshal: &mut marshal::Mailbox,
148 execution: &TempoFullNode,
149 epocher: &FixedEpocher,
150 start_epoch: u64,
151 ) -> Result<(), IdentityProofError> {
152 let cached = self.identity_cache.read().clone();
156 if let Some(cache) = &cached
157 && cache.to_epoch == 0
158 && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
159 {
160 return Ok(());
161 }
162
163 let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
165 let epoch_pubkey = *epoch_outcome.sharing().public();
166
167 if let Some(cache) = &cached
170 && start_epoch > cache.from_epoch
171 && cache.to_epoch == 0
172 && cache.from_pubkey == epoch_pubkey
173 {
174 let mut updated = cache.clone();
175 updated.from_epoch = start_epoch;
176 *self.identity_cache.write() = Some(updated);
177 return Ok(());
178 }
179
180 let mut transitions = Vec::new();
182 let mut pubkey = epoch_pubkey;
183 let mut search_epoch = start_epoch.saturating_sub(1);
184 while search_epoch > 0 {
185 if let Some(cache) = &cached
189 && search_epoch < cache.from_epoch
190 && search_epoch > cache.to_epoch
191 {
192 transitions.extend(cache.transitions.iter().cloned());
193 search_epoch = cache.to_epoch;
194 if cache.to_epoch == 0 {
195 break;
196 }
197
198 pubkey = cache.to_pubkey;
199 }
200
201 let prev_outcome = match get_outcome(execution, epocher, search_epoch - 1) {
202 Ok(outcome) => outcome,
203 Err(IdentityProofError::PrunedData(height)) => {
204 tracing::info!(
205 %height,
206 search_epoch = search_epoch - 1,
207 "stopping identity transition walk early (header not available)"
208 );
209 break;
210 }
211 Err(e) => return Err(e),
212 };
213
214 let prev_pubkey = *prev_outcome.sharing().public();
216 if pubkey != prev_pubkey {
217 let height = epocher
218 .last(Epoch::new(search_epoch))
219 .expect("fixed epocher is valid for all epochs");
220
221 let Some(header) = execution
222 .provider
223 .sealed_header(height.get())
224 .ok()
225 .flatten()
226 else {
227 tracing::info!(
228 height = height.get(),
229 search_epoch,
230 "stopping identity transition walk early (header not available)"
231 );
232 break;
233 };
234
235 let Some(finalization) = marshal.get_finalization(height).await else {
236 tracing::info!(
237 height = height.get(),
238 search_epoch,
239 "stopping identity transition walk early (finalization pruned)"
240 );
241 break;
242 };
243
244 if finalization.proposal.payload.0 != header.hash() {
245 return Err(IdentityProofError::MalformedData(height.get()));
246 }
247
248 transitions.push(IdentityTransition {
249 transition_epoch: search_epoch,
250 old_identity: hex::encode(prev_pubkey.encode()),
251 new_identity: hex::encode(pubkey.encode()),
252 proof: Some(TransitionProofData {
253 header: TempoHeaderResponse::from_consensus_header(header, 0),
254 finalization_certificate: hex::encode(finalization.encode()),
255 }),
256 });
257 }
258
259 pubkey = prev_pubkey;
260 search_epoch -= 1;
261 }
262
263 if search_epoch == 0 {
265 let has_genesis = transitions
266 .last()
267 .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
268
269 if !has_genesis {
270 match get_outcome(execution, epocher, 0) {
271 Ok(genesis_outcome) => {
272 let genesis_pubkey = *genesis_outcome.sharing().public();
273 let genesis_identity = hex::encode(genesis_pubkey.encode());
274 transitions.push(IdentityTransition {
275 transition_epoch: 0,
276 old_identity: genesis_identity.clone(),
277 new_identity: genesis_identity,
278 proof: None,
279 });
280 }
281 Err(err) => {
282 tracing::debug!(
283 ?err,
284 "failed to fetch genesis outcome; omitting genesis marker"
285 );
286 }
287 }
288 }
289 }
290
291 let new_cache = if let Some(c) = &cached {
294 let (from, from_pk) = if start_epoch >= c.from_epoch {
295 (start_epoch, epoch_pubkey)
296 } else {
297 (c.from_epoch, c.from_pubkey)
298 };
299
300 IdentityTransitionCache {
301 from_epoch: from,
302 from_pubkey: from_pk,
303 to_epoch: search_epoch,
304 to_pubkey: pubkey,
305 transitions: Arc::new(transitions),
306 }
307 } else {
308 IdentityTransitionCache {
309 from_epoch: start_epoch,
310 from_pubkey: epoch_pubkey,
311 to_epoch: search_epoch,
312 to_pubkey: pubkey,
313 transitions: Arc::new(transitions),
314 }
315 };
316
317 *self.identity_cache.write() = Some(new_cache);
318 Ok(())
319 }
320}
321
322impl Default for FeedStateHandle {
323 fn default() -> Self {
324 Self::new()
325 }
326}
327
328impl std::fmt::Debug for FeedStateHandle {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 let state = self.state.read();
331 f.debug_struct("FeedStateHandle")
332 .field("latest_notarized", &state.latest_notarized)
333 .field("latest_finalized", &state.latest_finalized)
334 .field("marshal_set", &self.marshal.get().is_some())
335 .field("execution_node_set", &self.execution_node.get().is_some())
336 .field("subscriber_count", &self.events_tx.receiver_count())
337 .finish()
338 }
339}
340
341impl ConsensusFeed for FeedStateHandle {
342 #[instrument(skip_all, fields(%query), ret(level = Level::DEBUG, Display))]
343 async fn get_finalization(&self, query: Query) -> Response<CertifiedBlock> {
344 match query {
345 Query::Latest => self
346 .state
347 .read()
348 .latest_finalized
349 .clone()
350 .map_or(Response::Missing("certifications"), Response::Success),
351 Query::Height(height) => 'process: {
352 let height = Height::new(height);
353 let Some(marshal) = self.marshal() else {
354 break 'process Response::NotReady;
355 };
356
357 let Some(finalization) = marshal.get_finalization(height).await else {
358 break 'process Response::Missing("certificate");
359 };
360 let Some(block) = marshal.get_block(height).await else {
361 break 'process Response::Missing("block");
362 };
363
364 Response::Success(CertifiedBlock {
365 epoch: finalization.proposal.round.epoch().get(),
366 view: finalization.proposal.round.view().get(),
367 block: block.into_inner().into_block(),
368 digest: finalization.proposal.payload.0,
369 certificate: hex::encode(finalization.encode()),
370 })
371 }
372 }
373 }
374
375 async fn get_latest(&self) -> ConsensusState {
376 let (finalized, mut notarized) = {
377 let state = self.state.read();
378 (
379 state.latest_finalized.clone(),
380 state.latest_notarized.clone(),
381 )
382 };
383
384 let finalized_round = finalized
385 .as_ref()
386 .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
387
388 let notarized_round = notarized
389 .as_ref()
390 .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
391
392 if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
394 notarized = None;
395 }
396
397 ConsensusState {
398 finalized,
399 notarized,
400 }
401 }
402
403 async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
404 Some(self.events_tx.subscribe())
405 }
406
407 async fn get_identity_transition_proof(
408 &self,
409 from_epoch: Option<u64>,
410 full: bool,
411 ) -> Result<IdentityTransitionResponse, IdentityProofError> {
412 let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
413 return Err(IdentityProofError::NotReady);
414 };
415 let Some(execution_node) = self.execution_node.get() else {
416 return Err(IdentityProofError::NotReady);
417 };
418
419 let start_epoch = if let Some(epoch) = from_epoch {
421 epoch
422 } else {
423 marshal
424 .get_info(Identifier::Latest)
425 .await
426 .and_then(|(h, _)| epocher.containing(h))
427 .ok_or(IdentityProofError::NotReady)?
428 .epoch()
429 .get()
430 };
431
432 self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
434 .await?;
435
436 let cache = self
437 .identity_cache
438 .read()
439 .clone()
440 .ok_or(IdentityProofError::NotReady)?;
441
442 let transitions: Vec<_> = cache
444 .transitions
445 .iter()
446 .filter(|t| t.transition_epoch <= start_epoch)
447 .cloned()
448 .collect();
449
450 let identity = cache
454 .transitions
455 .iter()
456 .filter(|t| t.transition_epoch > start_epoch)
457 .last()
458 .map(|t| t.old_identity.clone())
459 .unwrap_or_else(|| hex::encode(cache.from_pubkey.encode()));
460
461 let transitions = if full {
463 transitions
464 } else {
465 transitions
466 .into_iter()
467 .filter(|t| t.transition_epoch > 0)
468 .take(1)
469 .collect()
470 };
471
472 Ok(IdentityTransitionResponse {
473 identity,
474 transitions,
475 })
476 }
477}
478
479fn get_outcome(
481 execution: &TempoFullNode,
482 epocher: &FixedEpocher,
483 epoch: u64,
484) -> Result<OnchainDkgOutcome, IdentityProofError> {
485 let height = epocher
486 .last(Epoch::new(epoch))
487 .expect("fixed epocher is valid for all epochs");
488
489 let header = execution
490 .provider
491 .header_by_number(height.get())
492 .ok()
493 .flatten()
494 .ok_or(IdentityProofError::PrunedData(height.get()))?;
495
496 OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
497 .map_err(|_| IdentityProofError::MalformedData(height.get()))
498}