Skip to main content

tempo_commonware_node/feed/
actor.rs

1//! Feed actor implementation.
2//!
3//! This actor:
4//! - Receives consensus activity (notarizations, finalizations)
5//! - Updates shared state (accessible by RPC handlers)
6//! - Broadcasts events to subscribers
7//!
8//! Block resolution uses [`marshal::Mailbox::subscribe_by_digest`] to wait for the block
9//! to become available, avoiding a race where the block hasn't been stored yet
10//! when the activity arrives.
11//!
12//! The actor always polls the oldest (lowest-round) pending subscription so
13//! that events are emitted in order. Notarizations are dropped when a finalization
14//!  at a higher-or-equal round is pending, since the finalization supersedes them.
15
16use alloy_primitives::hex;
17use commonware_codec::Encode;
18use commonware_consensus::{
19    simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Activity},
20    types::{Epoch, FixedEpocher, Round, View},
21};
22use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
23use commonware_macros::select;
24use commonware_runtime::{ContextCell, Handle, Spawner, spawn_cell};
25use commonware_utils::channel::oneshot;
26use eyre::eyre;
27use futures::{FutureExt, StreamExt};
28use std::{
29    collections::BTreeMap,
30    future::Future,
31    pin::Pin,
32    sync::Arc,
33    task::{Context, Poll},
34    time::{SystemTime, UNIX_EPOCH},
35};
36use tempo_node::{
37    TempoFullNode,
38    rpc::consensus::{CertifiedBlock, Event},
39};
40use tracing::{debug, error, info_span, instrument, warn, warn_span};
41
42use super::state::FeedStateHandle;
43use crate::{
44    alias::marshal,
45    consensus::{Digest, block::Block},
46    utils::OptionFuture,
47};
48
49/// Type alias for the activity type used by the feed actor.
50pub(super) type FeedActivity = Activity<Scheme<PublicKey, MinSig>, Digest>;
51
52/// Receiver for activity messages.
53pub(super) type Receiver = futures::channel::mpsc::UnboundedReceiver<FeedActivity>;
54
55/// A pending block subscription paired with its originating activity.
56///
57/// Resolves to `(Round, FeedActivity, Block)` when the block becomes available.
58struct PendingSubscription {
59    round: Round,
60    activity: Option<FeedActivity>,
61    block_rx: oneshot::Receiver<Block>,
62}
63
64impl PendingSubscription {
65    fn new(round: Round, activity: FeedActivity, block_rx: oneshot::Receiver<Block>) -> Self {
66        Self {
67            round,
68            activity: Some(activity),
69            block_rx,
70        }
71    }
72}
73
74impl Future for PendingSubscription {
75    type Output = eyre::Result<(Round, FeedActivity, Block)>;
76
77    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78        match self.block_rx.poll_unpin(cx) {
79            Poll::Ready(Ok(block)) => {
80                let activity = self.activity.take().expect("polled after completion");
81                Poll::Ready(Ok((self.round, activity, block)))
82            }
83            Poll::Ready(Err(_)) => Poll::Ready(Err(eyre::eyre!("block subscription cancelled"))),
84            Poll::Pending => Poll::Pending,
85        }
86    }
87}
88
89pub(crate) struct Actor<TContext> {
90    /// Runtime context.
91    context: ContextCell<TContext>,
92    /// Receiver for activity messages.
93    receiver: Receiver,
94    /// Shared state handle.
95    state: FeedStateHandle,
96    /// Marshal mailbox for block lookups.
97    marshal: marshal::Mailbox,
98    /// Pending block subscriptions keyed by round. Since finalizations
99    /// must be delivered, pending subscriptions are bound by the marshal.
100    pending: BTreeMap<Round, PendingSubscription>,
101}
102
103impl<TContext: Spawner> Actor<TContext> {
104    /// Create a new feed actor.
105    ///
106    /// The actor receives Activity messages via `receiver` and updates the shared `state`.
107    pub(crate) fn new(
108        context: TContext,
109        marshal: marshal::Mailbox,
110        epocher: FixedEpocher,
111        execution_node: Arc<TempoFullNode>,
112        receiver: Receiver,
113        state: FeedStateHandle,
114    ) -> Self {
115        state.set_marshal(marshal.clone());
116        state.set_epocher(epocher);
117        state.set_execution_node(execution_node);
118
119        Self {
120            context: ContextCell::new(context),
121            receiver,
122            state,
123            marshal,
124            pending: BTreeMap::new(),
125        }
126    }
127
128    /// Start the actor, returning a handle to the spawned task.
129    pub(crate) fn start(mut self) -> Handle<()> {
130        spawn_cell!(self.context, self.run())
131    }
132
133    /// Run the actor's main loop.
134    ///
135    /// The loop races the oldest pending block subscription
136    /// against incoming activity so events are emitted in order.
137    async fn run(mut self) {
138        let reason = loop {
139            // We need a mutable reference to poll pending subscription. Thus if a new activity arrives,
140            // we also need to re-insert this popped subscription.
141            let mut oldest = OptionFuture::from(self.pending.pop_first().map(|(_, p)| p));
142
143            select!(
144                result = &mut oldest => {
145                    match result {
146                        Ok((_, activity, block)) => self.handle_activity(activity, block),
147                        Err(error) => warn_span!("feed_actor").in_scope(||
148                            warn!(%error, "did not get pending block")
149                        ),
150                    }
151                },
152
153                activity = self.receiver.next() => {
154                    let Some(activity) = activity else {
155                        break eyre!("mailbox closed");
156                    };
157
158                    if let Some(p) = oldest.take() {
159                        self.pending.insert(p.round, p);
160                    }
161                    self.subscribe(activity).await;
162                },
163            );
164        };
165
166        info_span!("feed_actor").in_scope(|| error!(%reason, "shutting down"));
167    }
168
169    async fn subscribe(&mut self, activity: FeedActivity) {
170        let (round, payload) = match &activity {
171            Activity::Notarization(n) => (n.proposal.round, n.proposal.payload),
172            Activity::Finalization(f) => (f.proposal.round, f.proposal.payload),
173            _ => return,
174        };
175
176        // Prune & filter incoming activity.
177        // - Incoming Finalization. Prune older subscriptions as we only care about latest information
178        // - Incoming Notarization. Only accept if ahead of the latest Finalization.
179        match &activity {
180            Activity::Finalization(_) => self.pending.retain(|&r, p| {
181                matches!(&p.activity, Some(Activity::Finalization(_))) || r > round
182            }),
183            Activity::Notarization(_)
184                if self
185                    .state
186                    .read()
187                    .latest_finalized
188                    .as_ref()
189                    .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)))
190                    .is_none_or(|f| f < round) => {}
191
192            _ => return,
193        }
194
195        let block_rx = self.marshal.subscribe_by_digest(Some(round), payload).await;
196        let pending = PendingSubscription::new(round, activity, block_rx);
197        self.pending.insert(round, pending);
198    }
199
200    #[instrument(skip_all, fields(activity = ?activity))]
201    fn handle_activity(&self, activity: FeedActivity, consensus_block: Block) {
202        let block = consensus_block.into_inner().into_block();
203        let (round, digest, certificate) = match activity.clone() {
204            Activity::Notarization(notarization) => (
205                notarization.proposal.round,
206                notarization.proposal.payload.0,
207                notarization.encode(),
208            ),
209            Activity::Finalization(finalization) => (
210                finalization.proposal.round,
211                finalization.proposal.payload.0,
212                finalization.encode(),
213            ),
214            _ => return,
215        };
216
217        let certified = CertifiedBlock {
218            epoch: round.epoch().get(),
219            view: round.view().get(),
220            block,
221            digest,
222            certificate: hex::encode(certificate),
223        };
224
225        let mut state = self.state.write();
226        let latest_finalized_round = state
227            .latest_finalized
228            .as_ref()
229            .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
230        let latest_notarized_round = state
231            .latest_notarized
232            .as_ref()
233            .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
234
235        // Update state and broadcast events
236        match activity {
237            Activity::Notarization(_) => {
238                let _ = self.state.events_tx().send(Event::Notarized {
239                    block: certified.clone(),
240                    seen: now_millis(),
241                });
242
243                if latest_finalized_round.is_none_or(|r| r < round)
244                    && latest_notarized_round.is_none_or(|r| r < round)
245                {
246                    state.latest_notarized = Some(certified);
247                }
248            }
249
250            Activity::Finalization(_) => {
251                debug!(
252                    subscribers = self.state.events_tx().receiver_count(),
253                    "sending finalized event",
254                );
255                let _ = self.state.events_tx().send(Event::Finalized {
256                    block: certified.clone(),
257                    seen: now_millis(),
258                });
259
260                if latest_finalized_round.is_none_or(|r| r < round) {
261                    if latest_notarized_round.is_none_or(|r| r < round) {
262                        state.latest_notarized = None;
263                    }
264
265                    state.latest_finalized = Some(certified);
266                }
267            }
268            _ => {}
269        }
270    }
271}
272
273/// Get current Unix timestamp in milliseconds.
274fn now_millis() -> u64 {
275    SystemTime::now()
276        .duration_since(UNIX_EPOCH)
277        .map(|d| d.as_millis() as u64)
278        .unwrap_or(0)
279}