[PATCH throttle v1] errors: improve error handling
Export this patch
Add a better error handling, and quitting with cleanup. This should
cleanly handle if the toml config is broken. Keyboard interrupts are
still messy and likely leave the socket file on disk. This also opens up
the possibility for sending a QUIT message to the server via the client
(not implemented yet).
---
throttle_cli/commandworker.py | 15 ++++++++++++++-
throttle_cli/server.py | 22 ++++++++++++++++++----
throttle_cli/structures.py | 1 +
3 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index d451d38..72f793b 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -32,7 +32,9 @@ class CommandWorker:
queue: "Queue[Msg]",
logqueue: "Queue[Any]",
comqueue: "Queue[Any]",
+ killevent,
):
+ self.killevent = killevent
self.manager = manager
self.q = queue
self.logqueue = logqueue
@@ -60,7 +62,13 @@ class CommandWorker:
return
# let's fail if the config is messed up
- config = toml.load(Path(configdir) / "config.toml")
+ try:
+ config = toml.load(Path(configdir) / "config.toml")
+ except ValueError as e:
+ self.logger.error(f"config failed to load: {e}")
+ self.q.put(Msg(action=ActionType.QUIT, origin=f"error in config: {e}"))
+ return
+
self.logger.debug(f"config found: {config}")
if "task_timeout" in config:
self.timeout = config["task_timeout"]
@@ -101,6 +109,11 @@ class CommandWorker:
self.comqueue.put(self.statistics)
case ActionType.STATUS:
self.comqueue.put(self.get_status())
+ case ActionType.QUIT:
+ self.logger.info(f"quitting from origin {msg.origin}")
+ self.logqueue.put(None) # quits logging
+ self.killevent.set()
+ break
def get_status(self):
retval = {}
diff --git a/throttle_cli/server.py b/throttle_cli/server.py
index c0acb6e..2be6d61 100644
--- a/throttle_cli/server.py
+++ b/throttle_cli/server.py
@@ -2,7 +2,14 @@ import logging
import os
import socket
import time
-from multiprocessing import Manager, Process, Queue, active_children, set_start_method
+from multiprocessing import (
+ Event,
+ Manager,
+ Process,
+ Queue,
+ active_children,
+ set_start_method,
+)
from pathlib import Path
from typing import Any, Callable
@@ -27,6 +34,7 @@ def ipcworker(socketpath: Path, handleMsg: Callable, handleInfo: Callable) -> No
def start_server(socketpath: Path, loglevel) -> None:
set_start_method("fork")
+ killevent = Event()
manager = Manager()
ipcqueue: Queue[Msg] = manager.Queue()
logqueue: Queue[Any] = manager.Queue()
@@ -34,7 +42,7 @@ def start_server(socketpath: Path, loglevel) -> None:
loggerp = Process(target=loglib.consumer, args=(logqueue,))
loggerp.start()
- msgworker = CommandWorker(manager, ipcqueue, logqueue, comqueue)
+ msgworker = CommandWorker(manager, ipcqueue, logqueue, comqueue, killevent)
loglib.publisher_config(logqueue, loglevel)
logger = logging.getLogger("server")
logger.debug(os.environ)
@@ -43,7 +51,6 @@ def start_server(socketpath: Path, loglevel) -> None:
ipcqueue.put(Msg(**msg))
def handleInfo(msg):
- print("handling")
ipcqueue.put(Msg(**msg))
return comqueue.get()
@@ -56,5 +63,12 @@ def start_server(socketpath: Path, loglevel) -> None:
p_msg.start()
while True:
time.sleep(1)
- if not active_children():
+ if not active_children() or killevent.is_set():
+ loggerp.terminate()
+ p_ipc.terminate()
+ p_msg.terminate()
+ loggerp.join()
+ p_ipc.join()
+ p_msg.join()
+ os.remove(socketpath)
break
diff --git a/throttle_cli/structures.py b/throttle_cli/structures.py
index f124e4f..cf5c286 100644
--- a/throttle_cli/structures.py
+++ b/throttle_cli/structures.py
@@ -10,6 +10,7 @@ class ActionType(Enum):
CLEAN = auto() # clear up dangling jobs
STATS = auto() # return stats to client
STATUS = auto() # return current status to client
+ QUIT = auto()
@dataclass
--
2.43.2