~qaul/community

[PATCH 1/2] Refactoring ratman internal channel initialisation

Details
Message ID
<20191024162604.7994-1-kookie@spacekookie.de>
DKIM signature
missing
Download raw message
Patch: +75 -46
---
 Cargo.lock            | 15 ++++++++++++--
 ratman/src/core.rs    | 38 +++++++++++++++++++----------------
 ratman/src/journal.rs | 46 +++++++++++++++++++++++--------------------
 ratman/src/lib.rs     | 22 +++++++++++++++------
 4 files changed, 75 insertions(+), 46 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c090e63..18ba6f2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -61,6 +61,16 @@ name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "conjoiner-engine"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "heapless 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "postcard 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "crypto-mac"
version = "0.7.0"
@@ -297,8 +307,7 @@ dependencies = [
name = "ratman"
version = "0.0.0"
dependencies = [
 "heapless 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "postcard 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "ratman-identity 0.1.0",
 "ratman-netmod 0.1.0",
 "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -318,6 +327,7 @@ dependencies = [
name = "ratman-netmod"
version = "0.1.0"
dependencies = [
 "conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "ratman-identity 0.1.0",
 "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
 "twox-hash 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -415,6 +425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5"
"checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "af89612071a896a4b0d4825f4804e635e4eb2a0ef62353cad902ab862f173eda"
"checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5"
"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
diff --git a/ratman/src/core.rs b/ratman/src/core.rs
index 46468b1..4792a13 100644
--- a/ratman/src/core.rs
+++ b/ratman/src/core.rs
@@ -23,20 +23,17 @@ struct Worker {
    _thread: JoinHandle<()>,
    /// Messages scheduled to be sent
    to_send: Arc<Mutex<Sender<Envelope>>>,
    /// Queue received Messages
    received: Arc<Mutex<Receiver<Envelope>>>,
}

impl Worker {
    /// Start a worker that sends frames and receives them
    fn start(ifs: EndpointMap) -> Self {
    fn start(ifs: EndpointMap) -> (Self, Receiver<Envelope>) {
        // Setup sending channel pair
        let (sending, rx) = channel();
        let to_send = Arc::new(Mutex::new(sending));

        // Setup receiving channel pair
        let (tx, recvd) = channel();
        let received = Arc::new(Mutex::new(recvd));
        let (tx, received) = channel();

        let _thread = thread::spawn(move || loop {
            // Send queued Messages
@@ -55,11 +52,14 @@ impl Worker {
                }
            });
        });
        Self {
            _thread,
            to_send,

        (
            Self {
                _thread,
                to_send,
            },
            received,
        }
        )
    }
}

@@ -80,16 +80,20 @@ pub(crate) struct Core {

impl Core {
    /// Create a new routing core
    pub(crate) fn new() -> Self {
    pub(crate) fn new() -> (Self, Receiver<Envelope>) {
        let ifs = Arc::new(Mutex::new(BTreeMap::new()));
        let routes = Arc::new(Mutex::new(BTreeMap::new()));

        Core {
            cnt: 0,
            worker: Worker::start(Arc::clone(&ifs)),
            routes,
            ifs,
        }
        let (worker, journal) = Worker::start(Arc::clone(&ifs));

        (
            Core {
                cnt: 0,
                worker,
                routes,
                ifs,
            },
            journal,
        )
    }

    /// Add an interface, assigning it a unique ID
diff --git a/ratman/src/journal.rs b/ratman/src/journal.rs
index f1b3c12..06d96b7 100644
--- a/ratman/src/journal.rs
+++ b/ratman/src/journal.rs
@@ -7,7 +7,7 @@ use netmod::Recipient;
use std::{
    collections::HashSet,
    sync::{
        mpsc::{Receiver, Sender},
        mpsc::{channel, Receiver, Sender},
        Arc, Mutex,
    },
    thread::{self, JoinHandle},
@@ -37,29 +37,33 @@ impl Journal {
    /// back to the routing core `send` logic. Local-addressed
    /// messages will be de-sliced and passed up the stack.
    pub(crate) fn start(
        discovery: Sender<Message>,
        recv: Receiver<Envelope>,
        core: Arc<Core>,
    ) -> Self {
        core: Arc<Mutex<Core>>,
    ) -> (Self, Receiver<Message>) {
        let local = Arc::new(Mutex::new(HashSet::new()));
        Self {
            local: Arc::clone(&local),
            worker: thread::spawn(move || loop {
                let Envelope(id, frame) = recv.recv().unwrap();
                let local = local.lock().unwrap();
        let (discovery, d_recv) = channel();
        (
            Self {
                local: Arc::clone(&local),
                worker: thread::spawn(move || loop {
                    let Envelope(id, frame) = recv.recv().unwrap();
                    let local = local.lock().unwrap();

                match frame.recipient.clone() {
                    Recipient::User(ref u) if local.contains(u) => {
                        let msg = Slicer::unslice(vec![frame]);
                        discovery.send(msg).unwrap();
                    match frame.recipient.clone() {
                        Recipient::User(ref u) if local.contains(u) => {
                            // TODO: Implement de-sequencing
                            let msg = Slicer::unslice(vec![frame]);
                            discovery.send(msg).unwrap();
                        }
                        Recipient::User(ref u) => {
                            let env = core.lock().unwrap().lookup(u, vec![frame]);
                            core.lock().unwrap().send(env);
                        }
                        Recipient::Flood => core.lock().unwrap().send(vec![Envelope(id, frame)]),
                    }
                    Recipient::User(ref u) => {
                        let env = core.lookup(u, vec![frame]);
                        core.send(env);
                    }
                    Recipient::Flood => core.send(vec![Envelope(id, frame)]),
                }
            }),
        }
                }),
            },
            d_recv,
        )
    }
}
diff --git a/ratman/src/lib.rs b/ratman/src/lib.rs
index f29758f..2005134 100644
--- a/ratman/src/lib.rs
+++ b/ratman/src/lib.rs
@@ -5,9 +5,9 @@

mod core;
mod data;
mod journal;
mod protocol;
mod slicer;
mod journal;

pub use crate::{
    data::{Message, Payload, Signature},
@@ -15,21 +15,31 @@ pub use crate::{
};
pub use netmod;

use crate::core::Core;
use crate::{core::Core, journal::Journal};
use netmod::Endpoint;

use std::sync::{Arc, Mutex};

/// A `RATMAN` router context
pub struct Router {
    core: Core,
    core: Arc<Mutex<Core>>,
    journal: Arc<Journal>,
}

impl Router {
    pub fn new() -> Self {
        Self { core: Core::new() }
        let (core, j_rcv) = Some(Core::new())
            .map(|(c, r)| (Arc::new(Mutex::new(c)), r))
            .unwrap();
        let (journal, d_send) = Some(Journal::start(j_rcv, Arc::clone(&core)))
            .map(|(j, s)| (Arc::new(j), s))
            .unwrap();

        Self { core, journal }
    }

    /// Add an `netmod` endpoint to this router
    pub fn add_ep(&mut self, ep: impl Endpoint + 'static + Send) {
        self.core.add_if(ep);
    pub fn add_ep(&self, ep: impl Endpoint + 'static + Send) {
        self.core.lock().unwrap().add_if(ep);
    }
}
-- 
2.23.0