Skip to main content

tempo_consensus/follow/upstream/
actor.rs

1use std::{sync::Arc, time::Duration};
2
3use commonware_consensus::{Reporter, types::Height};
4use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
5use eyre::{Report, WrapErr as _};
6use futures::{
7    FutureExt as _, StreamExt as _,
8    future::{BoxFuture, Either},
9    stream::{self, Fuse, FusedStream},
10};
11use jsonrpsee::{
12    core::{client, client::Subscription},
13    ws_client::{WsClient, WsClientBuilder},
14};
15use rand_08::Rng as _;
16use tempo_node::rpc::consensus::{CertifiedBlock, Event, Query, TempoConsensusApiClient};
17use tempo_telemetry_util::display_duration;
18use tokio::{
19    select,
20    sync::{mpsc, oneshot},
21};
22use tracing::{debug, debug_span, instrument, warn, warn_span};
23
24use crate::utils::OptionFuture;
25
26pub(super) type EventStream =
27    Either<stream::Empty<Result<Event, serde_json::Error>>, Fuse<Subscription<Event>>>;
28
29const RECONNECT_BACKOFF_FACTOR: u64 = 2;
30const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(20);
31const RECONNECT_JITTER: Duration = Duration::from_secs(1);
32
33/// Manages the connection to the upstream node.
34///
35/// This actor holds the websocket connection to the upstream node, reconnecting
36/// it if necessary.
37pub(crate) struct Actor<TContext> {
38    pub(super) context: ContextCell<TContext>,
39    pub(super) connection: Option<Arc<WsClient>>,
40    pub(super) mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
41    pub(super) url: &'static str,
42    pub(super) pending_connect: OptionFuture<BoxFuture<'static, (u64, eyre::Result<WsClient>)>>,
43    pub(super) pending_stream:
44        OptionFuture<BoxFuture<'static, Result<Subscription<Event>, client::Error>>>,
45    pub(super) event_stream: EventStream,
46    /// Requests for blocks while the actor is trying to establish a connection.
47    pub(super) waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
48}
49
50impl<TContext> Actor<TContext>
51where
52    TContext: Clock + Metrics + Spawner,
53{
54    pub(crate) fn start(
55        mut self,
56        reporter: impl Reporter<Activity = Event>,
57    ) -> commonware_runtime::Handle<()> {
58        spawn_cell!(self.context, self.run(reporter))
59    }
60
61    async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
62        loop {
63            self.reconnect_or_resubscribe();
64            self.drain_waiters();
65
66            select!(
67                biased;
68
69                (attempts, client) = &mut self.pending_connect => {
70                    match client {
71                        Ok(client) => {
72                            let client = Arc::new(client);
73                            self.connection.replace(client);
74                        }
75                        Err(reason) => {
76                            let reconnect_in = reconnect_delay(attempts);
77                            warn_span!("reconnect").in_scope(|| warn!(
78                                %reason,
79                                attempts,
80                                reconnect_in = %display_duration(reconnect_in),
81                                url = self.url,
82                                "connecting to upstream node failed, attempting again",
83                            ));
84                            self.pending_connect.replace({
85                                let context = self.context.clone();
86                                let url = self.url;
87                                async move {
88                                    context.sleep(reconnect_in).await;
89                                    connect(url, attempts.saturating_add(1)).await
90                                }.boxed()
91                            });
92                        }
93                    }
94                }
95
96                stream = &mut self.pending_stream => {
97                    match stream {
98                        Ok(stream) => {
99                        debug_span!("consensus_event_subscription")
100                            .in_scope(|| debug!("subscription for consensus events established"));
101                            self.event_stream = active_event_stream(stream);
102                        }
103                        Err(error) => {
104                            warn_span!("event_subscription").in_scope(|| warn!(
105                                reason = %Report::new(error),
106                                "failed subscribing to events; reconnecting to upstream node"
107                            ));
108                            self.connection.take();
109                            self.event_stream = inactive_event_stream();
110                        }
111                    }
112                }
113
114                event = self.event_stream.next(), if !self.event_stream.is_terminated() => {
115                    match event {
116                        Some(Ok(event)) => {
117                            debug_span!("consensus_event").in_scope(|| debug!(
118                                ?event, "received consensus event, forwarding to reporter"
119                            ));
120                            reporter.report(event).await;
121                        }
122                        Some(Err(error)) => {
123                            warn_span!("event").in_scope(|| warn!(
124                                %error,
125                                "event stream encountered an error",
126                            ));
127                            self.event_stream = inactive_event_stream();
128                        }
129                        None => {
130                            warn_span!("event_subscription").in_scope(|| warn!(
131                                url = self.url,
132                                "event stream terminated",
133                            ));
134                            self.event_stream = inactive_event_stream();
135                        }
136                    }
137                }
138
139                Some(msg) = self.mailbox.recv() => {
140                    match msg {
141                        super::ingress::Message::GetFinalization { height, response, } => {
142                            self.waiters.push((height, response));
143                        }
144                    }
145                }
146            );
147        }
148    }
149
150    #[instrument(skip_all)]
151    fn reconnect_or_resubscribe(&mut self) {
152        if self.pending_connect.is_some() || self.pending_stream.is_some() {
153            return;
154        }
155
156        let Some(client) = self.connection.clone() else {
157            self.pending_connect.replace(connect(self.url, 1));
158            return;
159        };
160
161        if !self.event_stream.is_terminated() {
162            return;
163        }
164
165        if client.is_connected() {
166            self.pending_stream.replace(subscribe(client));
167        } else {
168            warn!(url = self.url, "upstream client disconnected, reconnecting");
169            self.connection.take();
170            self.pending_connect.replace(connect(self.url, 1));
171        }
172    }
173
174    /// Drains the waiters by fetching the finalizations they are waiting for.
175    ///
176    /// Only executes if a client is present and connected.
177    fn drain_waiters(&mut self) {
178        if self.pending_connect.is_some()
179            || self.pending_stream.is_some()
180            || self.event_stream.is_terminated()
181        {
182            return;
183        }
184
185        let Some(client) = &self.connection else {
186            return;
187        };
188        if !client.is_connected() {
189            return;
190        }
191
192        for (height, response) in self.waiters.drain(..) {
193            let client = client.clone();
194            self.context
195                .with_label("get_finalization")
196                .spawn(move |_| get_finalization(client, height, response));
197        }
198    }
199}
200
201fn connect(url: &'static str, attempts: u64) -> BoxFuture<'static, (u64, eyre::Result<WsClient>)> {
202    async move {
203        (
204            attempts,
205            WsClientBuilder::default()
206                .build(&url)
207                .await
208                .map_err(Report::new),
209        )
210    }
211    .boxed()
212}
213
214fn subscribe(
215    client: Arc<WsClient>,
216) -> BoxFuture<'static, Result<Subscription<Event>, client::Error>> {
217    async move { client.subscribe_events().await }.boxed()
218}
219
220pub(super) fn inactive_event_stream() -> EventStream {
221    Either::Left(stream::empty())
222}
223
224fn active_event_stream(stream: Subscription<Event>) -> EventStream {
225    Either::Right(stream.fuse())
226}
227
228fn reconnect_delay(attempts: u64) -> Duration {
229    reconnect_backoff(attempts) + random_jitter()
230}
231
232fn reconnect_backoff(attempts: u64) -> Duration {
233    let backoff_secs = attempts.saturating_mul(RECONNECT_BACKOFF_FACTOR);
234    let backoff = Duration::from_secs(backoff_secs);
235
236    backoff.min(RECONNECT_MAX_BACKOFF)
237}
238
239fn random_jitter() -> Duration {
240    let max_jitter_millis = RECONNECT_JITTER.as_millis() as u64;
241    Duration::from_millis(rand_08::thread_rng().gen_range(0..=max_jitter_millis))
242}
243
244#[instrument(skip_all, fields(%height), err)]
245async fn get_finalization(
246    client: Arc<WsClient>,
247    height: Height,
248    response: oneshot::Sender<Option<CertifiedBlock>>,
249) -> eyre::Result<()> {
250    // TODO: right now, the response channel would just drop and an error
251    // emitted here. Should this failure be propagated upstream?
252    let finalization = client
253        .get_finalization(Query::Height(height.get()))
254        .await
255        .wrap_err("failed getting finalization")?;
256    response
257        .send(Some(finalization))
258        .map_err(|_| eyre::eyre!("receiver went away"))
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn reconnect_backoff_linearly_increases_and_caps() {
267        assert_eq!(reconnect_backoff(0), Duration::from_secs(0));
268        assert_eq!(reconnect_backoff(1), Duration::from_secs(2));
269        assert_eq!(reconnect_backoff(2), Duration::from_secs(4));
270        assert_eq!(reconnect_backoff(3), Duration::from_secs(6));
271        assert_eq!(reconnect_backoff(4), Duration::from_secs(8));
272        assert_eq!(reconnect_backoff(5), Duration::from_secs(10));
273        assert_eq!(reconnect_backoff(10), RECONNECT_MAX_BACKOFF);
274        assert_eq!(reconnect_backoff(u64::MAX), RECONNECT_MAX_BACKOFF);
275    }
276}