Skip to main content

tempo_consensus/follow/upstream/
mod.rs

1//! Actors to communicate with the upstream node.
2//!
3//! Maintains a regular connection to an upstream node over websocker
4//! or `in_process::Actor` as an in-process actor working off of channels.
5
6use commonware_consensus::Reporter;
7use commonware_runtime::{Clock, ContextCell, Metrics, Spawner};
8use tempo_node::rpc::consensus::Event;
9use tokio::sync::mpsc;
10
11use crate::utils::OptionFuture;
12
13mod actor;
14pub mod in_process;
15mod ingress;
16
17pub(crate) use actor::Actor;
18pub use ingress::Mailbox;
19
20/// An actor that can be started with reporters that receive consensus RPC events.
21pub trait UpstreamActor: Send + 'static {
22    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()>;
23}
24
25impl<TContext> UpstreamActor for Actor<TContext>
26where
27    TContext: Clock + Metrics + Spawner,
28{
29    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
30        self.start(reporter)
31    }
32}
33
34impl<TContext> UpstreamActor for in_process::Actor<TContext>
35where
36    TContext: Clock + Metrics + Spawner,
37{
38    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
39        self.start(reporter)
40    }
41}
42
43pub(crate) fn init<TContext>(
44    context: TContext,
45    config: Config,
46) -> (Actor<TContext>, ingress::Mailbox) {
47    let (tx, rx) = mpsc::unbounded_channel();
48    let mailbox = ingress::Mailbox::new(tx);
49
50    let url = Box::leak(Box::<str>::from(config.upstream_url));
51    let actor = Actor {
52        context: ContextCell::new(context),
53        connection: None,
54        mailbox: rx,
55        url,
56        pending_connect: OptionFuture::none(),
57        pending_stream: OptionFuture::none(),
58        event_stream: actor::inactive_event_stream(),
59        waiters: Vec::new(),
60    };
61
62    (actor, mailbox)
63}
64
65pub(crate) struct Config {
66    /// The URL to connect to.
67    pub(crate) upstream_url: String,
68}