tempo_consensus/follow/upstream/
mod.rs1use commonware_consensus::Reporter;
7use commonware_runtime::{Clock, ContextCell, Metrics, Spawner};
8use tempo_node::rpc::consensus::Event;
9use tokio::sync::mpsc;
10
11use crate::utils::OptionFuture;
12
13mod actor;
14pub mod in_process;
15mod ingress;
16
17pub(crate) use actor::Actor;
18pub use ingress::Mailbox;
19
20pub trait UpstreamActor: Send + 'static {
22 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()>;
23}
24
25impl<TContext> UpstreamActor for Actor<TContext>
26where
27 TContext: Clock + Metrics + Spawner,
28{
29 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
30 self.start(reporter)
31 }
32}
33
34impl<TContext> UpstreamActor for in_process::Actor<TContext>
35where
36 TContext: Clock + Metrics + Spawner,
37{
38 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
39 self.start(reporter)
40 }
41}
42
43pub(crate) fn init<TContext>(
44 context: TContext,
45 config: Config,
46) -> (Actor<TContext>, ingress::Mailbox) {
47 let (tx, rx) = mpsc::unbounded_channel();
48 let mailbox = ingress::Mailbox::new(tx);
49
50 let url = Box::leak(Box::<str>::from(config.upstream_url));
51 let actor = Actor {
52 context: ContextCell::new(context),
53 connection: None,
54 mailbox: rx,
55 url,
56 pending_connect: OptionFuture::none(),
57 pending_stream: OptionFuture::none(),
58 event_stream: actor::inactive_event_stream(),
59 waiters: Vec::new(),
60 };
61
62 (actor, mailbox)
63}
64
65pub(crate) struct Config {
66 pub(crate) upstream_url: String,
68}