~qaul/community

[PATCH 1/2] Refactoring ratman internal channel initialisation

Details
Message ID
<20191024162604.7994-1-kookie@spacekookie.de>
DKIM signature
missing
Download raw message
Patch: +75 -46
---
 Cargo.lock            | 15 ++++++++++++--
 ratman/src/core.rs    | 38 +++++++++++++++++++----------------
 ratman/src/journal.rs | 46 +++++++++++++++++++++++--------------------
 ratman/src/lib.rs     | 22 +++++++++++++++------
 4 files changed, 75 insertions(+), 46 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c090e63..18ba6f2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -61,6 +61,16 @@ name = "cfg-if"
 version = "0.1.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
+[[package]]
+name = "conjoiner-engine"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "heapless 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "postcard 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "crypto-mac"
 version = "0.7.0"
@@ -297,8 +307,7 @@ dependencies = [
 name = "ratman"
 version = "0.0.0"
 dependencies = [
- "heapless 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
- "postcard 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "ratman-identity 0.1.0",
  "ratman-netmod 0.1.0",
  "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -318,6 +327,7 @@ dependencies = [
 name = "ratman-netmod"
 version = "0.1.0"
 dependencies = [
+ "conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "ratman-identity 0.1.0",
  "serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
  "twox-hash 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -415,6 +425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 "checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5"
 "checksum c2-chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7d64d04786e0f528460fc884753cf8dddcc466be308f6026f8e355c41a0e4101"
 "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
+"checksum conjoiner-engine 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "af89612071a896a4b0d4825f4804e635e4eb2a0ef62353cad902ab862f173eda"
 "checksum crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4434400df11d95d556bac068ddfedd482915eb18fe8bea89bc80b6e4b1c179e5"
 "checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
 "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
diff --git a/ratman/src/core.rs b/ratman/src/core.rs
index 46468b1..4792a13 100644
--- a/ratman/src/core.rs
+++ b/ratman/src/core.rs
@@ -23,20 +23,17 @@ struct Worker {
     _thread: JoinHandle<()>,
     /// Messages scheduled to be sent
     to_send: Arc<Mutex<Sender<Envelope>>>,
-    /// Queue received Messages
-    received: Arc<Mutex<Receiver<Envelope>>>,
 }
 
 impl Worker {
     /// Start a worker that sends frames and receives them
-    fn start(ifs: EndpointMap) -> Self {
+    fn start(ifs: EndpointMap) -> (Self, Receiver<Envelope>) {
         // Setup sending channel pair
         let (sending, rx) = channel();
         let to_send = Arc::new(Mutex::new(sending));
 
         // Setup receiving channel pair
-        let (tx, recvd) = channel();
-        let received = Arc::new(Mutex::new(recvd));
+        let (tx, received) = channel();
 
         let _thread = thread::spawn(move || loop {
             // Send queued Messages
@@ -55,11 +52,14 @@ impl Worker {
                 }
             });
         });
-        Self {
-            _thread,
-            to_send,
+
+        (
+            Self {
+                _thread,
+                to_send,
+            },
             received,
-        }
+        )
     }
 }
 
@@ -80,16 +80,20 @@ pub(crate) struct Core {
 
 impl Core {
     /// Create a new routing core
-    pub(crate) fn new() -> Self {
+    pub(crate) fn new() -> (Self, Receiver<Envelope>) {
         let ifs = Arc::new(Mutex::new(BTreeMap::new()));
         let routes = Arc::new(Mutex::new(BTreeMap::new()));
-
-        Core {
-            cnt: 0,
-            worker: Worker::start(Arc::clone(&ifs)),
-            routes,
-            ifs,
-        }
+        let (worker, journal) = Worker::start(Arc::clone(&ifs));
+
+        (
+            Core {
+                cnt: 0,
+                worker,
+                routes,
+                ifs,
+            },
+            journal,
+        )
     }
 
     /// Add an interface, assigning it a unique ID
diff --git a/ratman/src/journal.rs b/ratman/src/journal.rs
index f1b3c12..06d96b7 100644
--- a/ratman/src/journal.rs
+++ b/ratman/src/journal.rs
@@ -7,7 +7,7 @@ use netmod::Recipient;
 use std::{
     collections::HashSet,
     sync::{
-        mpsc::{Receiver, Sender},
+        mpsc::{channel, Receiver, Sender},
         Arc, Mutex,
     },
     thread::{self, JoinHandle},
@@ -37,29 +37,33 @@ impl Journal {
     /// back to the routing core `send` logic. Local-addressed
     /// messages will be de-sliced and passed up the stack.
     pub(crate) fn start(
-        discovery: Sender<Message>,
         recv: Receiver<Envelope>,
-        core: Arc<Core>,
-    ) -> Self {
+        core: Arc<Mutex<Core>>,
+    ) -> (Self, Receiver<Message>) {
         let local = Arc::new(Mutex::new(HashSet::new()));
-        Self {
-            local: Arc::clone(&local),
-            worker: thread::spawn(move || loop {
-                let Envelope(id, frame) = recv.recv().unwrap();
-                let local = local.lock().unwrap();
+        let (discovery, d_recv) = channel();
+        (
+            Self {
+                local: Arc::clone(&local),
+                worker: thread::spawn(move || loop {
+                    let Envelope(id, frame) = recv.recv().unwrap();
+                    let local = local.lock().unwrap();
 
-                match frame.recipient.clone() {
-                    Recipient::User(ref u) if local.contains(u) => {
-                        let msg = Slicer::unslice(vec![frame]);
-                        discovery.send(msg).unwrap();
+                    match frame.recipient.clone() {
+                        Recipient::User(ref u) if local.contains(u) => {
+                            // TODO: Implement de-sequencing
+                            let msg = Slicer::unslice(vec![frame]);
+                            discovery.send(msg).unwrap();
+                        }
+                        Recipient::User(ref u) => {
+                            let env = core.lock().unwrap().lookup(u, vec![frame]);
+                            core.lock().unwrap().send(env);
+                        }
+                        Recipient::Flood => core.lock().unwrap().send(vec![Envelope(id, frame)]),
                     }
-                    Recipient::User(ref u) => {
-                        let env = core.lookup(u, vec![frame]);
-                        core.send(env);
-                    }
-                    Recipient::Flood => core.send(vec![Envelope(id, frame)]),
-                }
-            }),
-        }
+                }),
+            },
+            d_recv,
+        )
     }
 }
diff --git a/ratman/src/lib.rs b/ratman/src/lib.rs
index f29758f..2005134 100644
--- a/ratman/src/lib.rs
+++ b/ratman/src/lib.rs
@@ -5,9 +5,9 @@
 
 mod core;
 mod data;
+mod journal;
 mod protocol;
 mod slicer;
-mod journal;
 
 pub use crate::{
     data::{Message, Payload, Signature},
@@ -15,21 +15,31 @@ pub use crate::{
 };
 pub use netmod;
 
-use crate::core::Core;
+use crate::{core::Core, journal::Journal};
 use netmod::Endpoint;
 
+use std::sync::{Arc, Mutex};
+
 /// A `RATMAN` router context
 pub struct Router {
-    core: Core,
+    core: Arc<Mutex<Core>>,
+    journal: Arc<Journal>,
 }
 
 impl Router {
     pub fn new() -> Self {
-        Self { core: Core::new() }
+        let (core, j_rcv) = Some(Core::new())
+            .map(|(c, r)| (Arc::new(Mutex::new(c)), r))
+            .unwrap();
+        let (journal, d_send) = Some(Journal::start(j_rcv, Arc::clone(&core)))
+            .map(|(j, s)| (Arc::new(j), s))
+            .unwrap();
+
+        Self { core, journal }
     }
 
     /// Add an `netmod` endpoint to this router
-    pub fn add_ep(&mut self, ep: impl Endpoint + 'static + Send) {
-        self.core.add_if(ep);
+    pub fn add_ep(&self, ep: impl Endpoint + 'static + Send) {
+        self.core.lock().unwrap().add_if(ep);
     }
 }
-- 
2.23.0