This thread contains a patchset. You're looking at the original emails,
but you may wish to use the patch review UI.
Review patch
2
[PATCH throttle 1/3] worker: remove factory
It is completely unnecessary actually.
---
throttle_cli/commandworker.py | 146 ++++++++++++++++ ------------------
1 file changed, 69 insertions(+), 77 deletions(-)
diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index b9d7426..017dccb 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -111,7 +111,7 @@ class CommandWorker:
q: Queue[Msg] = Queue()
e = Event()
p = Process(
- target=self.runworkerFactory(),
+ target=self.worker,
args=(q, e, self.timeout, msg.job),
)
p.start()
@@ -185,90 +185,82 @@ class CommandWorker:
return newjob
return job
- def runworkerFactory(self):
- """
- Factory for handling each type of job.
- """
-
- def handlejobs(msg: Msg, e, logger) -> None:
- retry_timeout_index = -1
- error_counter = 0
- if msg.notification:
- retry_sequence = self.retry_sequence
- else:
- retry_sequence = self.retry_sequence_silent
- while True:
- if e.is_set():
- break
- if retry_timeout_index + 1 < len(retry_sequence):
- retry_timeout_index += 1
- success = True
- logger.debug(msg)
- logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}")
- try:
- proc = subprocess.run(
- shlex.split(msg.job),
- capture_output=True,
- timeout=self.job_timeout,
- )
- if proc.returncode != 0:
- success = False
- error_counter += 1
- logger.error(
- f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}"
- )
- if error_counter >= self.notify_on_counter and msg.notification:
- self.sendNotification(
- job=msg.job,
- origin=msg.origin,
- msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}",
- errcode=proc.returncode,
- )
- error_counter = 0
- except Exception as error:
+ def handlejobs(self, msg: Msg, e, logger) -> None:
+ retry_timeout_index = -1
+ error_counter = 0
+ if msg.notification:
+ retry_sequence = self.retry_sequence
+ else:
+ retry_sequence = self.retry_sequence_silent
+ while True:
+ if e.is_set():
+ break
+ if retry_timeout_index + 1 < len(retry_sequence):
+ retry_timeout_index += 1
+ success = True
+ logger.debug(msg)
+ logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}")
+ try:
+ proc = subprocess.run(
+ shlex.split(msg.job),
+ capture_output=True,
+ timeout=self.job_timeout,
+ )
+ if proc.returncode != 0:
success = False
error_counter += 1
- logger.error(f"{msg.job}'s subprocess failed with {error}")
+ logger.error(
+ f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}"
+ )
if error_counter >= self.notify_on_counter and msg.notification:
self.sendNotification(
job=msg.job,
origin=msg.origin,
- msg=f"subprocess failed with {error}",
+ msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}",
+ errcode=proc.returncode,
)
error_counter = 0
+ except Exception as error:
+ success = False
+ error_counter += 1
+ logger.error(f"{msg.job}'s subprocess failed with {error}")
+ if error_counter >= self.notify_on_counter and msg.notification:
+ self.sendNotification(
+ job=msg.job,
+ origin=msg.origin,
+ msg=f"subprocess failed with {error}",
+ )
+ error_counter = 0
- if success:
- break
- if e.is_set():
- break
- time.sleep(retry_sequence[retry_timeout_index])
-
- def worker(q, e, timeout, name) -> None:
- self.retry_sequence
- logger_name = f"{name.replace(' ','_')}_worker"
- logger = logging.getLogger(logger_name)
- counter = 0
- cont_counter = 0
- logger.debug(f"starting process for {logger_name}")
+ if success:
+ break
+ if e.is_set():
+ break
+ time.sleep(retry_sequence[retry_timeout_index])
- while True:
- if e.is_set():
- break
- try:
- msg = q.get(timeout=timeout)
- if msg.action == ActionType.RUN:
- counter += 1
- logger.debug(f"start run no: {counter} (CONT: {cont_counter})")
- handlejobs(msg, e, logger)
- logger.debug(f"finish run no: {counter} (CONT: {cont_counter})")
- else:
- cont_counter += 1
- logger.debug(f"handling CONT no. {cont_counter}")
- if msg.next():
- self.q.put(msg)
- except queue.Empty:
- logger.debug(f"closing process for {logger_name}")
- break
- self.q.put(Msg(action=ActionType.CLEAN))
+ def worker(self, q, e, timeout, name) -> None:
+ logger_name = f"{name.replace(' ','_')}_worker"
+ logger = logging.getLogger(logger_name)
+ counter = 0
+ cont_counter = 0
+ logger.debug(f"starting process for {logger_name}")
- return worker
+ while True:
+ if e.is_set():
+ break
+ try:
+ msg = q.get(timeout=timeout)
+ if msg.action == ActionType.RUN:
+ counter += 1
+ logger.debug(f"start run no: {counter} (CONT: {cont_counter})")
+ self.handlejobs(msg, e, logger)
+ logger.debug(f"finish run no: {counter} (CONT: {cont_counter})")
+ else:
+ cont_counter += 1
+ logger.debug(f"handling CONT no. {cont_counter}")
+ if msg.next():
+ self.q.put(msg)
+ except queue.Empty:
+ logger.debug(f"closing process for {logger_name}")
+ break
+ self.q.put(Msg(action=ActionType.CLEAN))
--
2.43.2
[PATCH throttle 2/3] multiprocessing: force forking
Multiprocessing uses fork on linux and spawn on Mac and Windows by
default. The latter doesn't even support forking. The current code
breaks when processes are spawned instead of forked. Enforce the use of
fork. Although the more general approach would be to allow spawning it
would need a bigger refactoring (mostly due to having to make sure
logging works as intended in all spawned processes) and since the
current version uses a unix socket to communicate anyway, even that
would not solve Windows support and throttle runs under WSL so that is
not a big issue.
---
throttle_cli/server.py | 3 ++ -
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/throttle_cli/server.py b/throttle_cli/server.py
index 14c4b73..6864e1b 100644
--- a/throttle_cli/server.py
+++ b/throttle_cli/server.py
@@ -2,7 +2,7 @@ import logging
import os
import socket
import time
- from multiprocessing import Process, Queue, active_children
+ from multiprocessing import Process, Queue, active_children, set_start_method
from pathlib import Path
from typing import Any, Callable
@@ -26,6 +26,7 @@ def ipcworker(socketpath: Path, handleMsg: Callable, handleInfo: Callable) -> No
def start_server(socketpath: Path, loglevel) -> None:
+ set_start_method("fork")
ipcqueue: Queue[Msg] = Queue()
logqueue: Queue[Any] = Queue()
comqueue: Queue[Any] = Queue()
--
2.43.2
[PATCH throttle 3/3] paths: improve cross-platform compatibility
pyxdg had issues on Mac. Use platformdirs which is supposed to be more
robust.
---
poetry.lock | 13 + ------------
pyproject.toml | 2 + -
throttle_cli/cli_client.py | 4 ++ --
throttle_cli/cli_server.py | 4 ++ --
throttle_cli/commandworker.py | 4 ++ --
throttle_cli/loglib.py | 4 ++ --
6 files changed, 10 insertions(+), 21 deletions(-)
diff --git a/poetry.lock b/poetry.lock
index 4fcbbf4..64bb3a8 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -194,17 +194,6 @@ wcwidth = "*"
[package.extras]
tests = ["pytest", "pytest-cov", "pytest-lazy-fixtures"]
- [[package]]
- name = "pyxdg"
- version = "0.28"
- description = "PyXDG contains implementations of freedesktop.org standards in python."
- optional = false
- python-versions = "*"
- files = [
- {file = "pyxdg-0.28-py2.py3-none-any.whl", hash = "sha256:bdaf595999a0178ecea4052b7f4195569c1ff4d344567bccdc12dfdf02d545ab"},
- {file = "pyxdg-0.28.tar.gz", hash = "sha256:3267bb3074e934df202af2ee0868575484108581e6f3cb006af1da35395e88b4"},
- ]
-
[[package]]
name = "ruff"
version = "0.2.2"
@@ -278,4 +267,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = ">=3.10"
- content-hash = "3022c95b0d251cf4ca38bea45a3cd73f5e55d08dea22ad8ad44f5667f7865a36"
+ content-hash = "2a27000aee2bf6a27e272775acbde5ab316a3e79947ca76bb1bfc432d15fd0c6"
diff --git a/pyproject.toml b/pyproject.toml
index 2a81200..7a80042 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,8 +11,8 @@ homepage = "https://sr.ht/~ferdinandyb/throttle/"
python = ">=3.10"
jsonrpclib-pelix = "^0.4.3.2"
toml = "^0.10.2"
- pyxdg = "^0.28"
prettytable = "^3.10.0"
+ platformdirs = "^4.2.0"
[tool.poetry.scripts]
throttle = "throttle_cli.cli_client:main"
diff --git a/throttle_cli/cli_client.py b/throttle_cli/cli_client.py
index 4870f4a..de0c202 100644
--- a/throttle_cli/cli_client.py
+++ b/throttle_cli/cli_client.py
@@ -1,6 +1,6 @@
from pathlib import Path
- from xdg import BaseDirectory
+ from platformdirs import user_runtime_dir
from . import __version__
from .arglib import storeJob, storeSilentJob
@@ -55,7 +55,7 @@ def main():
help="Format for printing results.",
)
args, unknownargs = parser.parse_known_args()
- socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock"
+ socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock"
if args.statistics:
get_info(socketpath, ActionType.STATS, args.format)
return
diff --git a/throttle_cli/cli_server.py b/throttle_cli/cli_server.py
index f7515dc..ba57d36 100644
--- a/throttle_cli/cli_server.py
+++ b/throttle_cli/cli_server.py
@@ -1,7 +1,7 @@
import logging
from pathlib import Path
- from xdg import BaseDirectory
+ from platformdirs import user_runtime_dir
from . import __version__
from .server import start_server
@@ -22,7 +22,7 @@ def main():
"--version", action="version", version=f"throttle {__version__}"
)
args = parser.parse_args()
- socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock"
+ socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock"
loglevel = logging.INFO
if args.LOGLEVEL:
loglevel = getattr(logging, args.LOGLEVEL)
diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index 017dccb..7b9b0f6 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -12,7 +12,7 @@ from pathlib import Path
from typing import Dict, List
import toml
- from xdg import BaseDirectory
+ from platformdirs import user_config_dir
from .structures import ActionType, Msg
@@ -44,7 +44,7 @@ class CommandWorker:
self.loadConfig()
def loadConfig(self):
- configdir = BaseDirectory.load_first_config("throttle")
+ configdir = user_config_dir("throttle")
if configdir is None:
return
configpath = Path(configdir) / "config.toml"
diff --git a/throttle_cli/loglib.py b/throttle_cli/loglib.py
index b77092b..97edfd5 100644
--- a/throttle_cli/loglib.py
+++ b/throttle_cli/loglib.py
@@ -2,12 +2,12 @@ import logging
import logging.handlers
from pathlib import Path
- from xdg import BaseDirectory
+ from platformdirs import user_log_dir
def consumer(queue):
root = logging.getLogger()
- logfolder = Path(BaseDirectory.xdg_state_home) / "throttle"
+ logfolder = Path(user_log_dir("throttle"))
logfolder.mkdir(parents=True, exist_ok=True)
filehandler = logging.handlers.RotatingFileHandler(
logfolder / "throttle.log", "a", 1024 * 1024 * 10, 3
--
2.43.2