subscription_service/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
#![forbid(unsafe_code)]
//! Generic pub/sub service framework
use anyhow::Result;
use channel::{
    diem_channel::{self, Receiver, Sender},
    message_queues::QueueStyle,
};
use diem_types::{
    event::EventKey,
    on_chain_config::{ConfigID, OnChainConfigPayload},
};
use std::collections::HashSet;
pub struct SubscriptionService<T, U> {
    pub name: String,
    subscribed_items: T,
    sender: Sender<(), U>,
}
impl<T: Clone, U> SubscriptionService<T, U> {
    /// Constructs an subscription object for `items`
    /// Returns the subscription object, and the receiving end of a channel that
    /// subscription will be sent to
    pub fn subscribe(name: &str, items: T) -> (Self, Receiver<(), U>) {
        let (sender, receiver) = diem_channel::new(QueueStyle::LIFO, 1, None);
        (
            Self {
                name: name.to_string(),
                sender,
                subscribed_items: items,
            },
            receiver,
        )
    }
    pub fn publish(&mut self, payload: U) -> Result<()> {
        self.sender.push((), payload)
    }
    pub fn subscribed_items(&self) -> T { self.subscribed_items.clone() }
}
/// A subscription service for on-chain reconfiguration notifications from state
/// sync This is declared/implemented here instead of in `types/on_chain_config`
/// because when `subscription_service` crate is a dependency of `types`, the
/// build-dev fails
pub type ReconfigSubscription =
    SubscriptionService<SubscriptionBundle, OnChainConfigPayload>;
#[derive(Clone)]
pub struct SubscriptionBundle {
    pub configs: HashSet<ConfigID>,
    pub events: HashSet<EventKey>,
}
impl SubscriptionBundle {
    pub fn new(configs: Vec<ConfigID>, events: Vec<EventKey>) -> Self {
        let configs = configs.into_iter().collect::<HashSet<_>>();
        let events = events.into_iter().collect::<HashSet<_>>();
        Self { configs, events }
    }
}
impl ReconfigSubscription {
    // Creates a subscription service named `name` that subscribes to changes in
    // configs specified in `configs` and emission of events specified in
    // `events` Returns (subscription service, endpoint that listens to the
    // service)
    pub fn subscribe_all(
        name: &str, configs: Vec<ConfigID>, events: Vec<EventKey>,
    ) -> (Self, Receiver<(), OnChainConfigPayload>) {
        let bundle = SubscriptionBundle::new(configs, events);
        Self::subscribe(name, bundle)
    }
}