Skip to main content

tempo_commonware_node/follow/upstream/
in_process.rs

1//! An upstream provider to be used in e2e tests The [`jsonrpsee`] stack used by
2//! the standard websocket based provider requires a tokio runtime, which the tests
3//! runtime does not provide.
4
5use std::{sync::Arc, time::Duration};
6
7use commonware_consensus::{Reporter, types::Height};
8use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
9use futures::{
10    FutureExt as _, StreamExt as _,
11    stream::{self, BoxStream, Fuse},
12};
13use tempo_node::{
14    TempoFullNode,
15    rpc::consensus::{CertifiedBlock, ConsensusFeed as _, Event, Query},
16};
17use tokio::{
18    select,
19    sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
22use tracing::{debug, debug_span, info, instrument};
23
24use crate::{feed::FeedStateHandle, utils::OptionFuture};
25
26use super::ingress::{Mailbox, Message};
27
28pub struct Config {
29    pub execution_node: Arc<TempoFullNode>,
30    pub feed: FeedStateHandle,
31}
32
33pub fn init<TContext>(context: TContext, config: Config) -> (Actor<TContext>, Mailbox) {
34    let (tx, rx) = mpsc::unbounded_channel();
35    let mailbox = Mailbox::new(tx);
36
37    let actor = Actor {
38        context: ContextCell::new(context),
39        config,
40        event_stream: stream::empty::<Result<Event, BroadcastStreamRecvError>>()
41            .boxed()
42            .fuse(),
43        mailbox: rx,
44        waiters: Vec::new(),
45    };
46    (actor, mailbox)
47}
48
49pub struct Actor<TContext> {
50    context: ContextCell<TContext>,
51    config: Config,
52    event_stream: Fuse<BoxStream<'static, Result<Event, BroadcastStreamRecvError>>>,
53    mailbox: mpsc::UnboundedReceiver<Message>,
54
55    waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
56}
57
58impl<TContext> Actor<TContext>
59where
60    TContext: Clock + Metrics + Spawner,
61{
62    pub(crate) fn start(
63        mut self,
64        reporter: impl Reporter<Activity = Event>,
65    ) -> commonware_runtime::Handle<()> {
66        spawn_cell!(self.context, self.run(reporter))
67    }
68
69    async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
70        let feed = self.config.feed.clone();
71        let context = self.context.clone();
72        let mut pending_subscription = OptionFuture::some(
73            async move {
74                loop {
75                    if let Some(subscription) = feed.subscribe().await {
76                        break subscription;
77                    }
78                    info!("feed state not yet ready, retrying in 1s");
79                    context.sleep(Duration::from_secs(1)).await;
80                }
81            }
82            .boxed(),
83        );
84        let mut connected = false;
85        loop {
86            select!(
87                biased;
88
89                stream = &mut pending_subscription, if pending_subscription.is_some() => {
90                    debug_span!("consensus_event_subscription")
91                        .in_scope(|| debug!("subscription for consensus events established"));
92                    self.event_stream = tokio_stream::wrappers::BroadcastStream::new(stream).boxed().fuse();
93                    connected = true;
94                }
95
96                Some(event) = self.event_stream.next() => {
97                    debug_span!("consensus_event").in_scope(|| debug!(
98                        ?event, "received consensus event, forwarding to reporter"
99                    ));
100                    match event {
101                        Ok(event) => reporter.report(event).await,
102                        Err(BroadcastStreamRecvError::Lagged(events_skipped)) => {
103                            debug_span!("subscription").in_scope(|| debug!(
104                                events_skipped,
105                                "lagged behind and skipped some events",
106                            ));
107                        },
108                    }
109                }
110
111                Some(msg) = self.mailbox.recv() => {
112                    match msg {
113                        super::ingress::Message::GetFinalization { height, response, } => {
114                            self.waiters.push((height, response));
115                        }
116                    }
117                }
118            );
119            if connected {
120                for (height, response) in self.waiters.drain(..) {
121                    let feed = self.config.feed.clone();
122                    self.context
123                        .with_label("get_finalization")
124                        .spawn(move |_| get_finalization(feed, height, response));
125                }
126            }
127        }
128    }
129}
130
131#[instrument(skip_all, fields(%height), err)]
132async fn get_finalization(
133    client: FeedStateHandle,
134    height: Height,
135    response: oneshot::Sender<Option<CertifiedBlock>>,
136) -> eyre::Result<()> {
137    // TODO: right now, the response channel would just drop and an error
138    // emitted here. Should this failure be propagated upstream?
139    let finalization = match client.get_finalization(Query::Height(height.get())).await {
140        tempo_node::rpc::consensus::types::Response::Success(val) => Some(val),
141        tempo_node::rpc::consensus::types::Response::NotReady => {
142            panic!("for in-process execution the feed should be immediately available")
143        }
144        tempo_node::rpc::consensus::types::Response::Missing(_) => None,
145    };
146    response
147        .send(finalization)
148        .map_err(|_| eyre::eyre!("receiver went away"))
149}