Bence Ferdinandy: 3 worker: remove factory multiprocessing: force forking paths: improve cross-platform compatibility 8 files changed, 81 insertions(+), 99 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~ferdinandyb/throttle-devel/patches/50875/mbox | git am -3Learn more about email & git
It is completely unnecessary actually. --- throttle_cli/commandworker.py | 146 ++++++++++++++++------------------ 1 file changed, 69 insertions(+), 77 deletions(-) diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py index b9d7426..017dccb 100644 --- a/throttle_cli/commandworker.py +++ b/throttle_cli/commandworker.py @@ -111,7 +111,7 @@ class CommandWorker: q: Queue[Msg] = Queue() e = Event() p = Process( - target=self.runworkerFactory(), + target=self.worker, args=(q, e, self.timeout, msg.job), ) p.start() @@ -185,90 +185,82 @@ class CommandWorker: return newjob return job - def runworkerFactory(self): - """ - Factory for handling each type of job. - """ - - def handlejobs(msg: Msg, e, logger) -> None: - retry_timeout_index = -1 - error_counter = 0 - if msg.notification: - retry_sequence = self.retry_sequence - else: - retry_sequence = self.retry_sequence_silent - while True: - if e.is_set(): - break - if retry_timeout_index + 1 < len(retry_sequence): - retry_timeout_index += 1 - success = True - logger.debug(msg) - logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}") - try: - proc = subprocess.run( - shlex.split(msg.job), - capture_output=True, - timeout=self.job_timeout, - ) - if proc.returncode != 0: - success = False - error_counter += 1 - logger.error( - f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}" - ) - if error_counter >= self.notify_on_counter and msg.notification: - self.sendNotification( - job=msg.job, - origin=msg.origin, - msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}", - errcode=proc.returncode, - ) - error_counter = 0 - except Exception as error: + def handlejobs(self, msg: Msg, e, logger) -> None: + retry_timeout_index = -1 + error_counter = 0 + if msg.notification: + retry_sequence = self.retry_sequence + else: + retry_sequence = self.retry_sequence_silent + while True: + if e.is_set(): + break + if retry_timeout_index + 1 < len(retry_sequence): + retry_timeout_index += 1 + success = True + logger.debug(msg) + logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}") + try: + proc = subprocess.run( + shlex.split(msg.job), + capture_output=True, + timeout=self.job_timeout, + ) + if proc.returncode != 0: success = False error_counter += 1 - logger.error(f"{msg.job}'s subprocess failed with {error}") + logger.error( + f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}" + ) if error_counter >= self.notify_on_counter and msg.notification: self.sendNotification( job=msg.job, origin=msg.origin, - msg=f"subprocess failed with {error}", + msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}", + errcode=proc.returncode, ) error_counter = 0 + except Exception as error: + success = False + error_counter += 1 + logger.error(f"{msg.job}'s subprocess failed with {error}") + if error_counter >= self.notify_on_counter and msg.notification: + self.sendNotification( + job=msg.job, + origin=msg.origin, + msg=f"subprocess failed with {error}", + ) + error_counter = 0 - if success: - break - if e.is_set(): - break - time.sleep(retry_sequence[retry_timeout_index]) - - def worker(q, e, timeout, name) -> None: - self.retry_sequence - logger_name = f"{name.replace(' ','_')}_worker" - logger = logging.getLogger(logger_name) - counter = 0 - cont_counter = 0 - logger.debug(f"starting process for {logger_name}") + if success: + break + if e.is_set(): + break + time.sleep(retry_sequence[retry_timeout_index]) - while True: - if e.is_set(): - break - try: - msg = q.get(timeout=timeout) - if msg.action == ActionType.RUN: - counter += 1 - logger.debug(f"start run no: {counter} (CONT: {cont_counter})") - handlejobs(msg, e, logger) - logger.debug(f"finish run no: {counter} (CONT: {cont_counter})") - else: - cont_counter += 1 - logger.debug(f"handling CONT no. {cont_counter}") - if msg.next(): - self.q.put(msg) - except queue.Empty: - logger.debug(f"closing process for {logger_name}") - break - self.q.put(Msg(action=ActionType.CLEAN)) + def worker(self, q, e, timeout, name) -> None: + logger_name = f"{name.replace(' ','_')}_worker" + logger = logging.getLogger(logger_name) + counter = 0 + cont_counter = 0 + logger.debug(f"starting process for {logger_name}") - return worker + while True: + if e.is_set(): + break + try: + msg = q.get(timeout=timeout) + if msg.action == ActionType.RUN: + counter += 1 + logger.debug(f"start run no: {counter} (CONT: {cont_counter})") + self.handlejobs(msg, e, logger) + logger.debug(f"finish run no: {counter} (CONT: {cont_counter})") + else: + cont_counter += 1 + logger.debug(f"handling CONT no. {cont_counter}") + if msg.next(): + self.q.put(msg) + except queue.Empty: + logger.debug(f"closing process for {logger_name}") + break + self.q.put(Msg(action=ActionType.CLEAN)) -- 2.43.2
Multiprocessing uses fork on linux and spawn on Mac and Windows by default. The latter doesn't even support forking. The current code breaks when processes are spawned instead of forked. Enforce the use of fork. Although the more general approach would be to allow spawning it would need a bigger refactoring (mostly due to having to make sure logging works as intended in all spawned processes) and since the current version uses a unix socket to communicate anyway, even that would not solve Windows support and throttle runs under WSL so that is not a big issue. --- throttle_cli/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/throttle_cli/server.py b/throttle_cli/server.py index 14c4b73..6864e1b 100644 --- a/throttle_cli/server.py +++ b/throttle_cli/server.py @@ -2,7 +2,7 @@ import logging import os import socket import time -from multiprocessing import Process, Queue, active_children +from multiprocessing import Process, Queue, active_children, set_start_method from pathlib import Path from typing import Any, Callable @@ -26,6 +26,7 @@ def ipcworker(socketpath: Path, handleMsg: Callable, handleInfo: Callable) -> No def start_server(socketpath: Path, loglevel) -> None: + set_start_method("fork") ipcqueue: Queue[Msg] = Queue() logqueue: Queue[Any] = Queue() comqueue: Queue[Any] = Queue() -- 2.43.2
pyxdg had issues on Mac. Use platformdirs which is supposed to be more robust. --- poetry.lock | 13 +------------ pyproject.toml | 2 +- throttle_cli/cli_client.py | 4 ++-- throttle_cli/cli_server.py | 4 ++-- throttle_cli/commandworker.py | 4 ++-- throttle_cli/loglib.py | 4 ++-- 6 files changed, 10 insertions(+), 21 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4fcbbf4..64bb3a8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -194,17 +194,6 @@ wcwidth = "*" [package.extras] tests = ["pytest", "pytest-cov", "pytest-lazy-fixtures"] -[[package]] -name = "pyxdg" -version = "0.28" -description = "PyXDG contains implementations of freedesktop.org standards in python." -optional = false -python-versions = "*" -files = [ - {file = "pyxdg-0.28-py2.py3-none-any.whl", hash = "sha256:bdaf595999a0178ecea4052b7f4195569c1ff4d344567bccdc12dfdf02d545ab"}, - {file = "pyxdg-0.28.tar.gz", hash = "sha256:3267bb3074e934df202af2ee0868575484108581e6f3cb006af1da35395e88b4"}, -] - [[package]] name = "ruff" version = "0.2.2" @@ -278,4 +267,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = ">=3.10" -content-hash = "3022c95b0d251cf4ca38bea45a3cd73f5e55d08dea22ad8ad44f5667f7865a36" +content-hash = "2a27000aee2bf6a27e272775acbde5ab316a3e79947ca76bb1bfc432d15fd0c6" diff --git a/pyproject.toml b/pyproject.toml index 2a81200..7a80042 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,8 +11,8 @@ homepage = "https://sr.ht/~ferdinandyb/throttle/" python = ">=3.10" jsonrpclib-pelix = "^0.4.3.2" toml = "^0.10.2" -pyxdg = "^0.28" prettytable = "^3.10.0" +platformdirs = "^4.2.0" [tool.poetry.scripts] throttle = "throttle_cli.cli_client:main" diff --git a/throttle_cli/cli_client.py b/throttle_cli/cli_client.py index 4870f4a..de0c202 100644 --- a/throttle_cli/cli_client.py +++ b/throttle_cli/cli_client.py @@ -1,6 +1,6 @@ from pathlib import Path -from xdg import BaseDirectory +from platformdirs import user_runtime_dir from . import __version__ from .arglib import storeJob, storeSilentJob @@ -55,7 +55,7 @@ def main(): help="Format for printing results.", ) args, unknownargs = parser.parse_known_args() - socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock" + socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock" if args.statistics: get_info(socketpath, ActionType.STATS, args.format) return diff --git a/throttle_cli/cli_server.py b/throttle_cli/cli_server.py index f7515dc..ba57d36 100644 --- a/throttle_cli/cli_server.py +++ b/throttle_cli/cli_server.py @@ -1,7 +1,7 @@ import logging from pathlib import Path -from xdg import BaseDirectory +from platformdirs import user_runtime_dir from . import __version__ from .server import start_server @@ -22,7 +22,7 @@ def main(): "--version", action="version", version=f"throttle {__version__}" ) args = parser.parse_args() - socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock" + socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock" loglevel = logging.INFO if args.LOGLEVEL: loglevel = getattr(logging, args.LOGLEVEL) diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py index 017dccb..7b9b0f6 100644 --- a/throttle_cli/commandworker.py +++ b/throttle_cli/commandworker.py @@ -12,7 +12,7 @@ from pathlib import Path from typing import Dict, List import toml -from xdg import BaseDirectory +from platformdirs import user_config_dir from .structures import ActionType, Msg @@ -44,7 +44,7 @@ class CommandWorker: self.loadConfig() def loadConfig(self): - configdir = BaseDirectory.load_first_config("throttle") + configdir = user_config_dir("throttle") if configdir is None: return configpath = Path(configdir) / "config.toml" diff --git a/throttle_cli/loglib.py b/throttle_cli/loglib.py index b77092b..97edfd5 100644 --- a/throttle_cli/loglib.py +++ b/throttle_cli/loglib.py @@ -2,12 +2,12 @@ import logging import logging.handlers from pathlib import Path -from xdg import BaseDirectory +from platformdirs import user_log_dir def consumer(queue): root = logging.getLogger() - logfolder = Path(BaseDirectory.xdg_state_home) / "throttle" + logfolder = Path(user_log_dir("throttle")) logfolder.mkdir(parents=True, exist_ok=True) filehandler = logging.handlers.RotatingFileHandler( logfolder / "throttle.log", "a", 1024 * 1024 * 10, 3 -- 2.43.2