~martijnbraam/openatem

proxy: MQTT frontend v1 PROPOSED

Jan Kundrát (2):
  proxy: do not assume all frontends "bind" somewhere
  proxy: add a read-only MQTT frontend

 build-aux/nl.brixit.Switcher.json   |  2 +-
 openswitcher_proxy/__main__.py      |  7 ++--
 openswitcher_proxy/frontend_mqtt.py | 54 +++++++++++++++++++++++++++++
 proxy.toml                          |  7 +++-
 4 files changed, 66 insertions(+), 4 deletions(-)
 create mode 100644 openswitcher_proxy/frontend_mqtt.py

-- 
2.33.0
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~martijnbraam/openatem/patches/26227/mbox | git am -3
Learn more about email & git

[PATCH 1/2] proxy: do not assume all frontends "bind" somewhere Export this patch

I'm writing a frontend which actually *connects* to an external server,
so let's not make any assumptions here. The only downside is that we
cannot easily print out what section of the config file is getting
handled before we initialize its handler. I think this is better than
calling a parameter for MQTT server's hostname to connect to "bind".

Signed-off-by: Jan Kundrát <jkt@jankundrat.com>
---
 openswitcher_proxy/__main__.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/openswitcher_proxy/__main__.py b/openswitcher_proxy/__main__.py
index 2a0779f..7dd4c07 100644
--- a/openswitcher_proxy/__main__.py
+++ b/openswitcher_proxy/__main__.py
@@ -35,7 +35,7 @@ def run(config_path):
    if 'frontend' in config:
        nthreads['frontend'] = {}
        for frontend in config['frontend']:
            logging.info(f'  frontend: {frontend["type"]} ({frontend["bind"]})')
            logging.info(f'  frontend: {frontend["type"]}')
            if frontend['type'] == 'status':
                t = StatusFrontendThread(frontend, nthreads)
            elif frontend['type'] == 'http-api':
@@ -47,7 +47,7 @@ def run(config_path):
                continue
            t.daemon = True
            threads.append(t)
            nthreads['frontend'][frontend['bind']] = t
            nthreads['frontend'][t.name] = t
            t.start()

    while True:
-- 
2.33.0

[PATCH 2/2] proxy: add a read-only MQTT frontend Export this patch

This should be used carefully because it blindly propagates all fields,
including possibly security-sensitive ones such as streaming keys.
Connection to MQTT is currently neither authenticated not encrypted.

On the other hand, this is super-useful for applications such as tally
lights, yay!

Signed-off-by: Jan Kundrát <jkt@jankundrat.com>
---
 build-aux/nl.brixit.Switcher.json   |  2 +-
 openswitcher_proxy/__main__.py      |  3 ++
 openswitcher_proxy/frontend_mqtt.py | 54 +++++++++++++++++++++++++++++
 proxy.toml                          |  7 +++-
 4 files changed, 64 insertions(+), 2 deletions(-)
 create mode 100644 openswitcher_proxy/frontend_mqtt.py

diff --git a/build-aux/nl.brixit.Switcher.json b/build-aux/nl.brixit.Switcher.json
index ef29655..3e34798 100644
--- a/build-aux/nl.brixit.Switcher.json
@@ -18,7 +18,7 @@
      "name": "hexdump",
      "buildsystem": "simple",
      "build-commands": [
        "pip3 install --prefix=/app hexdump pyusb"
        "pip3 install --prefix=/app hexdump pyusb paho-mqtt"
      ],
      "build-options": {
        "build-args": [
diff --git a/openswitcher_proxy/__main__.py b/openswitcher_proxy/__main__.py
index 7dd4c07..a15289e 100644
--- a/openswitcher_proxy/__main__.py
+++ b/openswitcher_proxy/__main__.py
@@ -7,6 +7,7 @@ import logging
from openswitcher_proxy.frontend_httpapi import HttpApiFrontendThread
from openswitcher_proxy.frontend_status import StatusFrontendThread
from openswitcher_proxy.frontend_tcp import TcpFrontendThread
from openswitcher_proxy.frontend_mqtt import MqttFrontendThread
from openswitcher_proxy.hardware import HardwareThread

logging.basicConfig(
@@ -42,6 +43,8 @@ def run(config_path):
                t = HttpApiFrontendThread(frontend, nthreads)
            elif frontend['type'] == 'tcp':
                t = TcpFrontendThread(frontend, nthreads)
            elif frontend['type'] == 'mqtt':
                t = MqttFrontendThread(frontend, nthreads)
            else:
                logging.error(f'  Unknown frontend type "{frontend["type"]}"')
                continue
diff --git a/openswitcher_proxy/frontend_mqtt.py b/openswitcher_proxy/frontend_mqtt.py
new file mode 100644
index 0000000..c526e52
--- /dev/null
+++ b/openswitcher_proxy/frontend_mqtt.py
@@ -0,0 +1,54 @@
import threading
import logging
import json
from .frontend_httpapi import FieldEncoder
try:
    import paho.mqtt.client as mqtt
except ModuleNotFoundError:
    mqtt = None


class MqttFrontendThread(threading.Thread):
    def __init__(self, config, threadlist):
        threading.Thread.__init__(self)
        if mqtt is None:
            raise NotImplementedError("The paho-mqtt library is not available")
        self.name = 'mqtt.' + str(config['server'])
        self.config = config
        self.threadlist = threadlist
        self.hw_name = self.config['hardware']
        self.switcher = self.threadlist['hardware'][self.hw_name].switcher
        self.client = None
        self.status = 'initializing...'

    def run(self):
        logging.info('MQTT frontend run')
        host, port = self.config['server'].split(':')
        port = int(port)
        self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self)
        self.client.on_connect = lambda client, userdata, flags, rc: self.on_mqtt_connect(flags, rc)
        self.client.on_message = lambda client, userdata, msg: self.on_mqtt_message(msg)
        logging.info(f'connecting to {host}:{port}')
        self.client.connect(host, port, keepalive=3)
        self.switcher.on('change', lambda field, value: self.on_switcher_changed(field, value))
        # FIXME: this is racy, I've seen `RuntimeError: dictionary changed size during iteration` once
        for field, value in self.switcher.mixerstate.items():
            self.on_switcher_changed(field, value)
        self.client.loop_forever()

    def on_switcher_changed(self, field, value):
        raw = json.dumps(value, cls=FieldEncoder)
        self.client.publish(f'atem/{self.hw_name}/{field}', raw)

    def on_mqtt_connect(self, flags, rc):
        self.status = 'running'
        logging.info(f'MQTT: connected ({rc})')
        # TODO: enable once on_mqtt_message() works
        # client.subscribe(f'atem/{userdata.hw_name}/#')

    def on_mqtt_message(self, msg):
        # TODO: propagate to the switcher, eventually
        logging.debug(f'MQTT: msg: {msg.topic} {msg.payload}')

    def get_status(self):
        return self.status
diff --git a/proxy.toml b/proxy.toml
index a115e7a..99668fe 100644
--- a/proxy.toml
+++ b/proxy.toml
@@ -29,4 +29,9 @@ bind = ":8083"
auth = true
username = "admin"
password = "password"
hardware = "mini"
\ No newline at end of file
hardware = "mini"
[[frontend]]
type = "mqtt"
server = "localhost:1883"
hardware = "mini"
auth = false
-- 
2.33.0