1use crate::{alias::marshal, network_identity::NetworkIdentity};
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8 marshal::Identifier,
9 types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use parking_lot::RwLock;
12use reth_node_core::rpc::compat::FromConsensusHeader;
13use reth_provider::HeaderProvider as _;
14use std::sync::{Arc, OnceLock};
15use tempo_alloy::rpc::TempoHeaderResponse;
16use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
17use tempo_node::{
18 TempoFullNode,
19 rpc::consensus::{
20 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
21 IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
22 types::Response,
23 },
24};
25use tokio::sync::broadcast;
26use tracing::{Level, instrument};
27
28const BROADCAST_CHANNEL_SIZE: usize = 1024;
29
30pub(super) struct FeedState {
32 pub(super) latest_notarized: Option<CertifiedBlock>,
34 pub(super) latest_finalized: Option<CertifiedBlock>,
36}
37
38#[derive(Clone)]
43struct IdentityTransitionCache {
44 from_epoch: u64,
46 from_identity: NetworkIdentity,
48 to_epoch: u64,
50 to_identity: NetworkIdentity,
52 transitions: Arc<Vec<IdentityTransition>>,
54}
55
56#[derive(Clone)]
62pub struct FeedStateHandle {
63 state: Arc<RwLock<FeedState>>,
64 marshal: Arc<OnceLock<marshal::Mailbox>>,
65 epocher: Arc<OnceLock<FixedEpocher>>,
66 execution_node: Arc<OnceLock<Arc<TempoFullNode>>>,
67 events_tx: broadcast::Sender<Event>,
68 identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
70}
71
72impl FeedStateHandle {
73 pub fn new() -> Self {
78 let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
79 Self {
80 state: Arc::new(RwLock::new(FeedState {
81 latest_notarized: None,
82 latest_finalized: None,
83 })),
84 marshal: Arc::new(OnceLock::new()),
85 epocher: Arc::new(OnceLock::new()),
86 execution_node: Arc::new(OnceLock::new()),
87 events_tx,
88 identity_cache: Arc::new(RwLock::new(None)),
89 }
90 }
91
92 pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
94 let _ = self.marshal.set(marshal);
95 }
96
97 pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
99 let _ = self.epocher.set(epocher);
100 }
101
102 pub(crate) fn set_execution_node(&self, execution_node: Arc<TempoFullNode>) {
104 let _ = self.execution_node.set(execution_node);
105 }
106
107 pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
109 &self.events_tx
110 }
111
112 pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
114 self.state.read()
115 }
116
117 pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
119 self.state.write()
120 }
121
122 fn marshal(&self) -> Option<marshal::Mailbox> {
124 let marshal = self.marshal.get().cloned();
125 if marshal.is_none() {
126 tracing::debug!("marshal not yet set");
127 }
128 marshal
129 }
130
131 fn epocher(&self) -> Option<FixedEpocher> {
133 let epocher = self.epocher.get().cloned();
134 if epocher.is_none() {
135 tracing::debug!("epocher not yet set");
136 }
137 epocher
138 }
139
140 #[instrument(skip_all, fields(start_epoch), err)]
144 async fn try_fill_transitions(
145 &self,
146 marshal: &mut marshal::Mailbox,
147 execution: &TempoFullNode,
148 epocher: &FixedEpocher,
149 start_epoch: u64,
150 ) -> Result<(), IdentityProofError> {
151 let cached = self.identity_cache.read().clone();
155 if let Some(cache) = &cached
156 && cache.to_epoch == 0
157 && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
158 {
159 return Ok(());
160 }
161
162 let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
164 let epoch_pubkey = *epoch_outcome.sharing().public();
165
166 if let Some(cache) = &cached
169 && start_epoch > cache.from_epoch
170 && cache.to_epoch == 0
171 && cache.from_identity.public_key() == epoch_pubkey
172 {
173 let mut updated = cache.clone();
174 updated.from_epoch = start_epoch;
175 *self.identity_cache.write() = Some(updated);
176 return Ok(());
177 }
178
179 let mut transitions = Vec::new();
181 let mut pubkey = epoch_pubkey;
182 let mut search_epoch = start_epoch.saturating_sub(1);
183 while search_epoch > 0 {
184 if let Some(cache) = &cached
188 && search_epoch < cache.from_epoch
189 && search_epoch > cache.to_epoch
190 {
191 transitions.extend(cache.transitions.iter().cloned());
192 search_epoch = cache.to_epoch;
193 if cache.to_epoch == 0 {
194 break;
195 }
196
197 pubkey = cache.to_identity.public_key();
198 }
199
200 let prev_outcome = match get_outcome(execution, epocher, search_epoch - 1) {
201 Ok(outcome) => outcome,
202 Err(IdentityProofError::PrunedData(height)) => {
203 tracing::info!(
204 %height,
205 search_epoch = search_epoch - 1,
206 "stopping identity transition walk early (header not available)"
207 );
208 break;
209 }
210 Err(e) => return Err(e),
211 };
212
213 let prev_pubkey = *prev_outcome.sharing().public();
215 if pubkey != prev_pubkey {
216 let height = epocher
217 .last(Epoch::new(search_epoch))
218 .expect("fixed epocher is valid for all epochs");
219
220 let Some(header) = execution
221 .provider
222 .sealed_header(height.get())
223 .ok()
224 .flatten()
225 else {
226 tracing::info!(
227 height = height.get(),
228 search_epoch,
229 "stopping identity transition walk early (header not available)"
230 );
231 break;
232 };
233
234 let Some(finalization) = marshal.get_finalization(height).await else {
235 tracing::info!(
236 height = height.get(),
237 search_epoch,
238 "stopping identity transition walk early (finalization pruned)"
239 );
240 break;
241 };
242
243 if finalization.proposal.payload.0 != header.hash() {
244 return Err(IdentityProofError::MalformedData(height.get()));
245 }
246
247 transitions.push(IdentityTransition {
248 transition_epoch: search_epoch,
249 old_identity: hex::encode(prev_pubkey.encode()),
250 new_identity: hex::encode(pubkey.encode()),
251 proof: Some(TransitionProofData {
252 header: TempoHeaderResponse::from_consensus_header(header, 0),
253 finalization_certificate: hex::encode(finalization.encode()),
254 }),
255 });
256 }
257
258 pubkey = prev_pubkey;
259 search_epoch -= 1;
260 }
261
262 if search_epoch == 0 {
264 let has_genesis = transitions
265 .last()
266 .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
267
268 if !has_genesis {
269 match get_outcome(execution, epocher, 0) {
270 Ok(genesis_outcome) => {
271 let genesis_pubkey = *genesis_outcome.sharing().public();
272 let genesis_identity = hex::encode(genesis_pubkey.encode());
273 transitions.push(IdentityTransition {
274 transition_epoch: 0,
275 old_identity: genesis_identity.clone(),
276 new_identity: genesis_identity,
277 proof: None,
278 });
279 }
280 Err(err) => {
281 tracing::debug!(
282 ?err,
283 "failed to fetch genesis outcome; omitting genesis marker"
284 );
285 }
286 }
287 }
288 }
289
290 let new_cache = if let Some(c) = &cached {
293 let (from, from_pubkey) = if start_epoch >= c.from_epoch {
294 (start_epoch, epoch_pubkey)
295 } else {
296 (c.from_epoch, c.from_identity.public_key())
297 };
298
299 IdentityTransitionCache {
300 from_epoch: from,
301 from_identity: NetworkIdentity(from_pubkey),
302 to_epoch: search_epoch,
303 to_identity: NetworkIdentity(pubkey),
304 transitions: Arc::new(transitions),
305 }
306 } else {
307 IdentityTransitionCache {
308 from_epoch: start_epoch,
309 from_identity: NetworkIdentity(epoch_pubkey),
310 to_epoch: search_epoch,
311 to_identity: NetworkIdentity(pubkey),
312 transitions: Arc::new(transitions),
313 }
314 };
315
316 *self.identity_cache.write() = Some(new_cache);
317 Ok(())
318 }
319}
320
321impl Default for FeedStateHandle {
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327impl std::fmt::Debug for FeedStateHandle {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 let state = self.state.read();
330 f.debug_struct("FeedStateHandle")
331 .field("latest_notarized", &state.latest_notarized)
332 .field("latest_finalized", &state.latest_finalized)
333 .field("marshal_set", &self.marshal.get().is_some())
334 .field("execution_node_set", &self.execution_node.get().is_some())
335 .field("subscriber_count", &self.events_tx.receiver_count())
336 .finish()
337 }
338}
339
340impl ConsensusFeed for FeedStateHandle {
341 #[instrument(skip_all, fields(%query), ret(level = Level::DEBUG, Display))]
342 async fn get_finalization(&self, query: Query) -> Response<CertifiedBlock> {
343 match query {
344 Query::Latest => self
345 .state
346 .read()
347 .latest_finalized
348 .clone()
349 .map_or(Response::Missing("certifications"), Response::Success),
350 Query::Height(height) => 'process: {
351 let height = Height::new(height);
352 let Some(marshal) = self.marshal() else {
353 break 'process Response::NotReady;
354 };
355
356 let Some(finalization) = marshal.get_finalization(height).await else {
357 break 'process Response::Missing("certificate");
358 };
359 let Some(block) = marshal.get_block(height).await else {
360 break 'process Response::Missing("block");
361 };
362
363 Response::Success(CertifiedBlock {
364 epoch: finalization.proposal.round.epoch().get(),
365 view: finalization.proposal.round.view().get(),
366 block: block.into_inner().into_block(),
367 digest: finalization.proposal.payload.0,
368 certificate: hex::encode(finalization.encode()),
369 })
370 }
371 }
372 }
373
374 async fn get_latest(&self) -> ConsensusState {
375 let (finalized, mut notarized) = {
376 let state = self.state.read();
377 (
378 state.latest_finalized.clone(),
379 state.latest_notarized.clone(),
380 )
381 };
382
383 let finalized_round = finalized
384 .as_ref()
385 .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
386
387 let notarized_round = notarized
388 .as_ref()
389 .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
390
391 if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
393 notarized = None;
394 }
395
396 ConsensusState {
397 finalized,
398 notarized,
399 }
400 }
401
402 async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
403 Some(self.events_tx.subscribe())
404 }
405
406 async fn get_identity_transition_proof(
407 &self,
408 from_epoch: Option<u64>,
409 full: bool,
410 ) -> Result<IdentityTransitionResponse, IdentityProofError> {
411 let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
412 return Err(IdentityProofError::NotReady);
413 };
414 let Some(execution_node) = self.execution_node.get() else {
415 return Err(IdentityProofError::NotReady);
416 };
417
418 let start_epoch = if let Some(epoch) = from_epoch {
420 epoch
421 } else {
422 marshal
423 .get_info(Identifier::Latest)
424 .await
425 .and_then(|(h, _)| epocher.containing(h))
426 .ok_or(IdentityProofError::NotReady)?
427 .epoch()
428 .get()
429 };
430
431 self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
433 .await?;
434
435 let cache = self
436 .identity_cache
437 .read()
438 .clone()
439 .ok_or(IdentityProofError::NotReady)?;
440
441 let transitions: Vec<_> = cache
443 .transitions
444 .iter()
445 .filter(|t| t.transition_epoch <= start_epoch)
446 .cloned()
447 .collect();
448
449 let identity = cache
453 .transitions
454 .iter()
455 .filter(|t| t.transition_epoch > start_epoch)
456 .last()
457 .map(|t| t.old_identity.clone())
458 .unwrap_or_else(|| hex::encode(cache.from_identity.public_key().encode()));
459
460 let transitions = if full {
462 transitions
463 } else {
464 transitions
465 .into_iter()
466 .filter(|t| t.transition_epoch > 0)
467 .take(1)
468 .collect()
469 };
470
471 Ok(IdentityTransitionResponse {
472 identity,
473 transitions,
474 })
475 }
476}
477
478fn get_outcome(
480 execution: &TempoFullNode,
481 epocher: &FixedEpocher,
482 epoch: u64,
483) -> Result<OnchainDkgOutcome, IdentityProofError> {
484 let height = epocher
485 .last(Epoch::new(epoch))
486 .expect("fixed epocher is valid for all epochs");
487
488 let header = execution
489 .provider
490 .header_by_number(height.get())
491 .ok()
492 .flatten()
493 .ok_or(IdentityProofError::PrunedData(height.get()))?;
494
495 OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
496 .map_err(|_| IdentityProofError::MalformedData(height.get()))
497}