Jan Kundrát (2): proxy: do not assume all frontends "bind" somewhere proxy: add a read-only MQTT frontend build-aux/nl.brixit.Switcher.json | 2 +- openswitcher_proxy/__main__.py | 7 ++-- openswitcher_proxy/frontend_mqtt.py | 54 +++++++++++++++++++++++++++++ proxy.toml | 7 +++- 4 files changed, 66 insertions(+), 4 deletions(-) create mode 100644 openswitcher_proxy/frontend_mqtt.py -- 2.33.0
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~martijnbraam/openatem/patches/26227/mbox | git am -3Learn more about email & git
I'm writing a frontend which actually *connects* to an external server, so let's not make any assumptions here. The only downside is that we cannot easily print out what section of the config file is getting handled before we initialize its handler. I think this is better than calling a parameter for MQTT server's hostname to connect to "bind". Signed-off-by: Jan Kundrát <jkt@jankundrat.com> --- openswitcher_proxy/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openswitcher_proxy/__main__.py b/openswitcher_proxy/__main__.py index 2a0779f..7dd4c07 100644 --- a/openswitcher_proxy/__main__.py +++ b/openswitcher_proxy/__main__.py @@ -35,7 +35,7 @@ def run(config_path): if 'frontend' in config: nthreads['frontend'] = {} for frontend in config['frontend']: - logging.info(f' frontend: {frontend["type"]} ({frontend["bind"]})') + logging.info(f' frontend: {frontend["type"]}') if frontend['type'] == 'status': t = StatusFrontendThread(frontend, nthreads) elif frontend['type'] == 'http-api': @@ -47,7 +47,7 @@ def run(config_path): continue t.daemon = True threads.append(t) - nthreads['frontend'][frontend['bind']] = t + nthreads['frontend'][t.name] = t t.start() while True: -- 2.33.0
This should be used carefully because it blindly propagates all fields, including possibly security-sensitive ones such as streaming keys. Connection to MQTT is currently neither authenticated not encrypted. On the other hand, this is super-useful for applications such as tally lights, yay! Signed-off-by: Jan Kundrát <jkt@jankundrat.com> --- build-aux/nl.brixit.Switcher.json | 2 +- openswitcher_proxy/__main__.py | 3 ++ openswitcher_proxy/frontend_mqtt.py | 54 +++++++++++++++++++++++++++++ proxy.toml | 7 +++- 4 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 openswitcher_proxy/frontend_mqtt.py diff --git a/build-aux/nl.brixit.Switcher.json b/build-aux/nl.brixit.Switcher.json index ef29655..3e34798 100644 --- a/build-aux/nl.brixit.Switcher.json @@ -18,7 +18,7 @@ "name": "hexdump", "buildsystem": "simple", "build-commands": [ - "pip3 install --prefix=/app hexdump pyusb" + "pip3 install --prefix=/app hexdump pyusb paho-mqtt" ], "build-options": { "build-args": [ diff --git a/openswitcher_proxy/__main__.py b/openswitcher_proxy/__main__.py index 7dd4c07..a15289e 100644 --- a/openswitcher_proxy/__main__.py +++ b/openswitcher_proxy/__main__.py @@ -7,6 +7,7 @@ import logging from openswitcher_proxy.frontend_httpapi import HttpApiFrontendThread from openswitcher_proxy.frontend_status import StatusFrontendThread from openswitcher_proxy.frontend_tcp import TcpFrontendThread +from openswitcher_proxy.frontend_mqtt import MqttFrontendThread from openswitcher_proxy.hardware import HardwareThread logging.basicConfig( @@ -42,6 +43,8 @@ def run(config_path): t = HttpApiFrontendThread(frontend, nthreads) elif frontend['type'] == 'tcp': t = TcpFrontendThread(frontend, nthreads) + elif frontend['type'] == 'mqtt': + t = MqttFrontendThread(frontend, nthreads) else: logging.error(f' Unknown frontend type "{frontend["type"]}"') continue diff --git a/openswitcher_proxy/frontend_mqtt.py b/openswitcher_proxy/frontend_mqtt.py new file mode 100644 index 0000000..c526e52 --- /dev/null +++ b/openswitcher_proxy/frontend_mqtt.py @@ -0,0 +1,54 @@ +import threading +import logging +import json +from .frontend_httpapi import FieldEncoder +try: + import paho.mqtt.client as mqtt +except ModuleNotFoundError: + mqtt = None + + +class MqttFrontendThread(threading.Thread): + def __init__(self, config, threadlist): + threading.Thread.__init__(self) + if mqtt is None: + raise NotImplementedError("The paho-mqtt library is not available") + self.name = 'mqtt.' + str(config['server']) + self.config = config + self.threadlist = threadlist + self.hw_name = self.config['hardware'] + self.switcher = self.threadlist['hardware'][self.hw_name].switcher + self.client = None + self.status = 'initializing...' + + def run(self): + logging.info('MQTT frontend run') + host, port = self.config['server'].split(':') + port = int(port) + self.client = mqtt.Client(client_id=f'atem-{self.name}', userdata=self) + self.client.on_connect = lambda client, userdata, flags, rc: self.on_mqtt_connect(flags, rc) + self.client.on_message = lambda client, userdata, msg: self.on_mqtt_message(msg) + logging.info(f'connecting to {host}:{port}') + self.client.connect(host, port, keepalive=3) + self.switcher.on('change', lambda field, value: self.on_switcher_changed(field, value)) + # FIXME: this is racy, I've seen `RuntimeError: dictionary changed size during iteration` once + for field, value in self.switcher.mixerstate.items(): + self.on_switcher_changed(field, value) + self.client.loop_forever() + + def on_switcher_changed(self, field, value): + raw = json.dumps(value, cls=FieldEncoder) + self.client.publish(f'atem/{self.hw_name}/{field}', raw) + + def on_mqtt_connect(self, flags, rc): + self.status = 'running' + logging.info(f'MQTT: connected ({rc})') + # TODO: enable once on_mqtt_message() works + # client.subscribe(f'atem/{userdata.hw_name}/#') + + def on_mqtt_message(self, msg): + # TODO: propagate to the switcher, eventually + logging.debug(f'MQTT: msg: {msg.topic} {msg.payload}') + + def get_status(self): + return self.status diff --git a/proxy.toml b/proxy.toml index a115e7a..99668fe 100644 --- a/proxy.toml +++ b/proxy.toml @@ -29,4 +29,9 @@ bind = ":8083" auth = true username = "admin" password = "password" -hardware = "mini" \ No newline at end of file +hardware = "mini" +[[frontend]] +type = "mqtt" +server = "localhost:1883" +hardware = "mini" +auth = false -- 2.33.0