tempo_commonware_node/feed/
actor.rs1use alloy_primitives::hex;
17use commonware_codec::Encode;
18use commonware_consensus::{
19 simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Activity},
20 types::{Epoch, FixedEpocher, Round, View},
21};
22use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
23use commonware_macros::select;
24use commonware_runtime::{ContextCell, Handle, Spawner, spawn_cell};
25use commonware_utils::channel::oneshot;
26use eyre::eyre;
27use futures::{FutureExt, StreamExt};
28use std::{
29 collections::BTreeMap,
30 future::Future,
31 pin::Pin,
32 sync::Arc,
33 task::{Context, Poll},
34 time::{SystemTime, UNIX_EPOCH},
35};
36use tempo_node::{
37 TempoFullNode,
38 rpc::consensus::{CertifiedBlock, Event},
39};
40use tracing::{debug, error, info_span, instrument, warn, warn_span};
41
42use super::state::FeedStateHandle;
43use crate::{
44 alias::marshal,
45 consensus::{Digest, block::Block},
46 utils::OptionFuture,
47};
48
49pub(super) type FeedActivity = Activity<Scheme<PublicKey, MinSig>, Digest>;
51
52pub(super) type Receiver = futures::channel::mpsc::UnboundedReceiver<FeedActivity>;
54
55struct PendingSubscription {
59 round: Round,
60 activity: Option<FeedActivity>,
61 block_rx: oneshot::Receiver<Block>,
62}
63
64impl PendingSubscription {
65 fn new(round: Round, activity: FeedActivity, block_rx: oneshot::Receiver<Block>) -> Self {
66 Self {
67 round,
68 activity: Some(activity),
69 block_rx,
70 }
71 }
72}
73
74impl Future for PendingSubscription {
75 type Output = eyre::Result<(Round, FeedActivity, Block)>;
76
77 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
78 match self.block_rx.poll_unpin(cx) {
79 Poll::Ready(Ok(block)) => {
80 let activity = self.activity.take().expect("polled after completion");
81 Poll::Ready(Ok((self.round, activity, block)))
82 }
83 Poll::Ready(Err(_)) => Poll::Ready(Err(eyre::eyre!("block subscription cancelled"))),
84 Poll::Pending => Poll::Pending,
85 }
86 }
87}
88
89pub(crate) struct Actor<TContext> {
90 context: ContextCell<TContext>,
92 receiver: Receiver,
94 state: FeedStateHandle,
96 marshal: marshal::Mailbox,
98 pending: BTreeMap<Round, PendingSubscription>,
101}
102
103impl<TContext: Spawner> Actor<TContext> {
104 pub(crate) fn new(
108 context: TContext,
109 marshal: marshal::Mailbox,
110 epocher: FixedEpocher,
111 execution_node: Arc<TempoFullNode>,
112 receiver: Receiver,
113 state: FeedStateHandle,
114 ) -> Self {
115 state.set_marshal(marshal.clone());
116 state.set_epocher(epocher);
117 state.set_execution_node(execution_node);
118
119 Self {
120 context: ContextCell::new(context),
121 receiver,
122 state,
123 marshal,
124 pending: BTreeMap::new(),
125 }
126 }
127
128 pub(crate) fn start(mut self) -> Handle<()> {
130 spawn_cell!(self.context, self.run())
131 }
132
133 async fn run(mut self) {
138 let reason = loop {
139 let mut oldest = OptionFuture::from(self.pending.pop_first().map(|(_, p)| p));
142
143 select!(
144 result = &mut oldest => {
145 match result {
146 Ok((_, activity, block)) => self.handle_activity(activity, block),
147 Err(error) => warn_span!("feed_actor").in_scope(||
148 warn!(%error, "did not get pending block")
149 ),
150 }
151 },
152
153 activity = self.receiver.next() => {
154 let Some(activity) = activity else {
155 break eyre!("mailbox closed");
156 };
157
158 if let Some(p) = oldest.take() {
159 self.pending.insert(p.round, p);
160 }
161 self.subscribe(activity).await;
162 },
163 );
164 };
165
166 info_span!("feed_actor").in_scope(|| error!(%reason, "shutting down"));
167 }
168
169 async fn subscribe(&mut self, activity: FeedActivity) {
170 let (round, payload) = match &activity {
171 Activity::Notarization(n) => (n.proposal.round, n.proposal.payload),
172 Activity::Finalization(f) => (f.proposal.round, f.proposal.payload),
173 _ => return,
174 };
175
176 match &activity {
180 Activity::Finalization(_) => self.pending.retain(|&r, p| {
181 matches!(&p.activity, Some(Activity::Finalization(_))) || r > round
182 }),
183 Activity::Notarization(_)
184 if self
185 .state
186 .read()
187 .latest_finalized
188 .as_ref()
189 .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)))
190 .is_none_or(|f| f < round) => {}
191
192 _ => return,
193 }
194
195 let block_rx = self.marshal.subscribe_by_digest(Some(round), payload).await;
196 let pending = PendingSubscription::new(round, activity, block_rx);
197 self.pending.insert(round, pending);
198 }
199
200 #[instrument(skip_all, fields(activity = ?activity))]
201 fn handle_activity(&self, activity: FeedActivity, consensus_block: Block) {
202 let block = consensus_block.into_inner().into_block();
203 let (round, digest, certificate) = match activity.clone() {
204 Activity::Notarization(notarization) => (
205 notarization.proposal.round,
206 notarization.proposal.payload.0,
207 notarization.encode(),
208 ),
209 Activity::Finalization(finalization) => (
210 finalization.proposal.round,
211 finalization.proposal.payload.0,
212 finalization.encode(),
213 ),
214 _ => return,
215 };
216
217 let certified = CertifiedBlock {
218 epoch: round.epoch().get(),
219 view: round.view().get(),
220 block,
221 digest,
222 certificate: hex::encode(certificate),
223 };
224
225 let mut state = self.state.write();
226 let latest_finalized_round = state
227 .latest_finalized
228 .as_ref()
229 .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
230 let latest_notarized_round = state
231 .latest_notarized
232 .as_ref()
233 .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
234
235 match activity {
237 Activity::Notarization(_) => {
238 let _ = self.state.events_tx().send(Event::Notarized {
239 block: certified.clone(),
240 seen: now_millis(),
241 });
242
243 if latest_finalized_round.is_none_or(|r| r < round)
244 && latest_notarized_round.is_none_or(|r| r < round)
245 {
246 state.latest_notarized = Some(certified);
247 }
248 }
249
250 Activity::Finalization(_) => {
251 debug!(
252 subscribers = self.state.events_tx().receiver_count(),
253 "sending finalized event",
254 );
255 let _ = self.state.events_tx().send(Event::Finalized {
256 block: certified.clone(),
257 seen: now_millis(),
258 });
259
260 if latest_finalized_round.is_none_or(|r| r < round) {
261 if latest_notarized_round.is_none_or(|r| r < round) {
262 state.latest_notarized = None;
263 }
264
265 state.latest_finalized = Some(certified);
266 }
267 }
268 _ => {}
269 }
270 }
271}
272
273fn now_millis() -> u64 {
275 SystemTime::now()
276 .duration_since(UNIX_EPOCH)
277 .map(|d| d.as_millis() as u64)
278 .unwrap_or(0)
279}