Skip to main content

tempo_commonware_node/follow/upstream/
actor.rs

1use std::{sync::Arc, time::Duration};
2
3use commonware_consensus::{Reporter, types::Height};
4use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
5use eyre::{Report, WrapErr as _};
6use futures::{
7    FutureExt as _, StreamExt as _,
8    future::BoxFuture,
9    stream::{BoxStream, Fuse},
10};
11use jsonrpsee::{
12    core::{client, client::Subscription},
13    ws_client::{WsClient, WsClientBuilder},
14};
15use tempo_node::rpc::consensus::{CertifiedBlock, Event, Query, TempoConsensusApiClient};
16use tempo_telemetry_util::display_duration;
17use tokio::{
18    select,
19    sync::{mpsc, oneshot},
20};
21use tracing::{debug, debug_span, instrument, warn, warn_span};
22
23use crate::utils::OptionFuture;
24
25type EventStream = Fuse<BoxStream<'static, Result<Event, serde_json::Error>>>;
26
27/// Manages the connection to the upstream node.
28///
29/// This actor holds the websocket connection to the upstream node, reconnecting
30/// it if necessary.
31pub(crate) struct Actor<TContext> {
32    pub(super) context: ContextCell<TContext>,
33    pub(super) connection: Option<Arc<WsClient>>,
34    pub(super) mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
35    pub(super) url: &'static str,
36    pub(super) pending_connect: OptionFuture<BoxFuture<'static, (u64, eyre::Result<WsClient>)>>,
37    pub(super) pending_stream:
38        OptionFuture<BoxFuture<'static, Result<Subscription<Event>, client::Error>>>,
39    pub(super) event_stream: EventStream,
40    /// Requests for blocks while the actor is trying to establish a connection.
41    pub(super) waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
42}
43
44impl<TContext> Actor<TContext>
45where
46    TContext: Clock + Metrics + Spawner,
47{
48    pub(crate) fn start(
49        mut self,
50        reporter: impl Reporter<Activity = Event>,
51    ) -> commonware_runtime::Handle<()> {
52        spawn_cell!(self.context, self.run(reporter))
53    }
54
55    async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
56        self.pending_connect.replace({
57            let url = self.url;
58            async move {
59                (
60                    1,
61                    WsClientBuilder::default()
62                        .build(&url)
63                        .await
64                        .map_err(Report::new),
65                )
66            }
67            .boxed()
68        });
69        loop {
70            select!(
71                biased;
72
73                error = OptionFuture::new(self.connection.as_ref().map(|c| c.on_disconnect()))
74                => {
75                    warn_span!("connection").in_scope(|| warn!(
76                        reason = %Report::new(error),
77                        url = self.url,
78                        "connection to upstream node disconnected, attempting reconnect",
79                    ));
80                    self.connection.take();
81                    self.pending_stream.take();
82                    self.pending_connect.replace({
83                        let url = self.url;
84                        async move {
85                             (1, WsClientBuilder::default().build(&url).await.map_err(Report::new))
86                        }.boxed()
87                    });
88                }
89
90                (attempts, client) = &mut self.pending_connect => {
91                    match client {
92                        Ok(client) => {
93                            let client = Arc::new(client);
94                            self.connection.replace(client.clone());
95                            self.pending_stream.replace(async move {
96                                client.subscribe_events().await
97                            }.boxed());
98                        }
99                        Err(reason) => {
100                            let reconnect_in = Duration::from_secs(attempts.saturating_mul(1).min(20));
101                            warn_span!("reconnect").in_scope(|| warn!(
102                                %reason,
103                                attempts,
104                                reconnect_in = %display_duration(reconnect_in),
105                                url = self.url,
106                                "connecting to upstream node failed, attempting again",
107                            ));
108                            self.pending_connect.replace({
109                                let context = self.context.clone();
110                                let url = self.url;
111                                async move {
112                                    context.sleep(reconnect_in).await;
113                                    (1, WsClientBuilder::default().build(&url).await.map_err(Report::new))
114                                }.boxed()
115                            });
116                        }
117                    }
118                }
119
120                stream = &mut self.pending_stream => {
121                    match stream {
122                        Ok(stream) => {
123                        debug_span!("consensus_event_subscription")
124                            .in_scope(|| debug!("subscription for consensus events established"));
125                            self.event_stream = stream.boxed().fuse();
126                        }
127                        Err(error) => {
128                            warn_span!("event_subscription").in_scope(|| warn!(
129                                reason = %Report::new(error),
130                                "failed subscribing to events; retrying"
131                            ));
132                            if let Some(client) = self.connection.clone() {
133                                self.pending_stream.replace(async move {
134                                    client.subscribe_events().await
135                                }.boxed());
136                            }
137                        }
138                    }
139                }
140
141                Some(event) = self.event_stream.next() => {
142                    debug_span!("consensus_event").in_scope(|| debug!(
143                        ?event, "received consensus event, forwarding to reporter"
144                    ));
145                    match event {
146                        Ok(event) => reporter.report(event).await,
147                        Err(error) => warn_span!("event").in_scope(|| warn!(
148                            %error,
149                            "event stream encountered an error",
150                        )),
151                    }
152                }
153
154                Some(msg) = self.mailbox.recv() => {
155                    match msg {
156                        super::ingress::Message::GetFinalization { height, response, } => {
157                            self.waiters.push((height, response));
158                        }
159                    }
160                }
161            );
162
163            if let Some(client) = &self.connection {
164                for (height, response) in self.waiters.drain(..) {
165                    let client = client.clone();
166                    self.context
167                        .with_label("get_finalization")
168                        .spawn(move |_| get_finalization(client, height, response));
169                }
170            }
171        }
172    }
173}
174
175#[instrument(skip_all, fields(%height), err)]
176async fn get_finalization(
177    client: Arc<WsClient>,
178    height: Height,
179    response: oneshot::Sender<Option<CertifiedBlock>>,
180) -> eyre::Result<()> {
181    // TODO: right now, the response channel would just drop and an error
182    // emitted here. Should this failure be propagated upstream?
183    let finalization = client
184        .get_finalization(Query::Height(height.get()))
185        .await
186        .wrap_err("failed getting finalization")?;
187    response
188        .send(Some(finalization))
189        .map_err(|_| eyre::eyre!("receiver went away"))
190}