subscription_service/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10//! Generic pub/sub service framework
11
12use anyhow::Result;
13use channel::{
14    diem_channel::{self, Receiver, Sender},
15    message_queues::QueueStyle,
16};
17use diem_types::{
18    event::EventKey,
19    on_chain_config::{ConfigID, OnChainConfigPayload},
20};
21use std::collections::HashSet;
22
23pub struct SubscriptionService<T, U> {
24    pub name: String,
25    subscribed_items: T,
26    sender: Sender<(), U>,
27}
28
29impl<T: Clone, U> SubscriptionService<T, U> {
30    /// Constructs an subscription object for `items`
31    /// Returns the subscription object, and the receiving end of a channel that
32    /// subscription will be sent to
33    pub fn subscribe(name: &str, items: T) -> (Self, Receiver<(), U>) {
34        let (sender, receiver) = diem_channel::new(QueueStyle::LIFO, 1, None);
35        (
36            Self {
37                name: name.to_string(),
38                sender,
39                subscribed_items: items,
40            },
41            receiver,
42        )
43    }
44
45    pub fn publish(&mut self, payload: U) -> Result<()> {
46        self.sender.push((), payload)
47    }
48
49    pub fn subscribed_items(&self) -> T { self.subscribed_items.clone() }
50}
51
52/// A subscription service for on-chain reconfiguration notifications from state
53/// sync This is declared/implemented here instead of in `types/on_chain_config`
54/// because when `subscription_service` crate is a dependency of `types`, the
55/// build-dev fails
56pub type ReconfigSubscription =
57    SubscriptionService<SubscriptionBundle, OnChainConfigPayload>;
58
59#[derive(Clone)]
60pub struct SubscriptionBundle {
61    pub configs: HashSet<ConfigID>,
62    pub events: HashSet<EventKey>,
63}
64
65impl SubscriptionBundle {
66    pub fn new(configs: Vec<ConfigID>, events: Vec<EventKey>) -> Self {
67        let configs = configs.into_iter().collect::<HashSet<_>>();
68        let events = events.into_iter().collect::<HashSet<_>>();
69
70        Self { configs, events }
71    }
72}
73
74impl ReconfigSubscription {
75    // Creates a subscription service named `name` that subscribes to changes in
76    // configs specified in `configs` and emission of events specified in
77    // `events` Returns (subscription service, endpoint that listens to the
78    // service)
79    pub fn subscribe_all(
80        name: &str, configs: Vec<ConfigID>, events: Vec<EventKey>,
81    ) -> (Self, Receiver<(), OnChainConfigPayload>) {
82        let bundle = SubscriptionBundle::new(configs, events);
83        Self::subscribe(name, bundle)
84    }
85}