~ferdinandyb/throttle-devel

throttle: worker: remove factory v2 PROPOSED

Bence Ferdinandy: 4
 worker: remove factory
 multiprocessing: force forking
 paths: improve cross-platform compatibility
 multiprocessing: switch to Manager().Queue()

 10 files changed, 97 insertions(+), 107 deletions(-)
Tested-by: Maarten Aertsen <maarten@nlnetlabs.nl>
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~ferdinandyb/throttle-devel/patches/50937/mbox | git am -3
Learn more about email & git

[PATCH throttle v2 1/4] worker: remove factory Export this patch

It is completely unnecessary actually.
---
 throttle_cli/commandworker.py | 146 ++++++++++++++++------------------
 1 file changed, 69 insertions(+), 77 deletions(-)

diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index b9d7426..017dccb 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -111,7 +111,7 @@ class CommandWorker:
            q: Queue[Msg] = Queue()
            e = Event()
            p = Process(
                target=self.runworkerFactory(),
                target=self.worker,
                args=(q, e, self.timeout, msg.job),
            )
            p.start()
@@ -185,90 +185,82 @@ class CommandWorker:
                return newjob
        return job

    def runworkerFactory(self):
        """
        Factory for handling each type of job.
        """

        def handlejobs(msg: Msg, e, logger) -> None:
            retry_timeout_index = -1
            error_counter = 0
            if msg.notification:
                retry_sequence = self.retry_sequence
            else:
                retry_sequence = self.retry_sequence_silent
            while True:
                if e.is_set():
                    break
                if retry_timeout_index + 1 < len(retry_sequence):
                    retry_timeout_index += 1
                success = True
                logger.debug(msg)
                logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}")
                try:
                    proc = subprocess.run(
                        shlex.split(msg.job),
                        capture_output=True,
                        timeout=self.job_timeout,
                    )
                    if proc.returncode != 0:
                        success = False
                        error_counter += 1
                        logger.error(
                            f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}"
                        )
                        if error_counter >= self.notify_on_counter and msg.notification:
                            self.sendNotification(
                                job=msg.job,
                                origin=msg.origin,
                                msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}",
                                errcode=proc.returncode,
                            )
                            error_counter = 0
                except Exception as error:
    def handlejobs(self, msg: Msg, e, logger) -> None:
        retry_timeout_index = -1
        error_counter = 0
        if msg.notification:
            retry_sequence = self.retry_sequence
        else:
            retry_sequence = self.retry_sequence_silent
        while True:
            if e.is_set():
                break
            if retry_timeout_index + 1 < len(retry_sequence):
                retry_timeout_index += 1
            success = True
            logger.debug(msg)
            logger.debug(f"running job: {msg.job} with timeout {self.job_timeout}")
            try:
                proc = subprocess.run(
                    shlex.split(msg.job),
                    capture_output=True,
                    timeout=self.job_timeout,
                )
                if proc.returncode != 0:
                    success = False
                    error_counter += 1
                    logger.error(f"{msg.job}'s subprocess failed with {error}")
                    logger.error(
                        f"{proc.returncode=}, {proc.stdout=}, {proc.stderr=}, {error_counter=}"
                    )
                    if error_counter >= self.notify_on_counter and msg.notification:
                        self.sendNotification(
                            job=msg.job,
                            origin=msg.origin,
                            msg=f"subprocess failed with {error}",
                            msg=f"c:{error_counter}|{proc.stderr.decode('utf-8')} - {proc.stdout.decode('utf-8')}",
                            errcode=proc.returncode,
                        )
                        error_counter = 0
            except Exception as error:
                success = False
                error_counter += 1
                logger.error(f"{msg.job}'s subprocess failed with {error}")
                if error_counter >= self.notify_on_counter and msg.notification:
                    self.sendNotification(
                        job=msg.job,
                        origin=msg.origin,
                        msg=f"subprocess failed with {error}",
                    )
                    error_counter = 0

                if success:
                    break
                if e.is_set():
                    break
                time.sleep(retry_sequence[retry_timeout_index])

        def worker(q, e, timeout, name) -> None:
            self.retry_sequence
            logger_name = f"{name.replace(' ','_')}_worker"
            logger = logging.getLogger(logger_name)
            counter = 0
            cont_counter = 0
            logger.debug(f"starting process for {logger_name}")
            if success:
                break
            if e.is_set():
                break
            time.sleep(retry_sequence[retry_timeout_index])

            while True:
                if e.is_set():
                    break
                try:
                    msg = q.get(timeout=timeout)
                    if msg.action == ActionType.RUN:
                        counter += 1
                        logger.debug(f"start run no: {counter} (CONT: {cont_counter})")
                        handlejobs(msg, e, logger)
                        logger.debug(f"finish run no: {counter} (CONT: {cont_counter})")
                    else:
                        cont_counter += 1
                        logger.debug(f"handling CONT no. {cont_counter}")
                    if msg.next():
                        self.q.put(msg)
                except queue.Empty:
                    logger.debug(f"closing process for {logger_name}")
                    break
            self.q.put(Msg(action=ActionType.CLEAN))
    def worker(self, q, e, timeout, name) -> None:
        logger_name = f"{name.replace(' ','_')}_worker"
        logger = logging.getLogger(logger_name)
        counter = 0
        cont_counter = 0
        logger.debug(f"starting process for {logger_name}")

        return worker
        while True:
            if e.is_set():
                break
            try:
                msg = q.get(timeout=timeout)
                if msg.action == ActionType.RUN:
                    counter += 1
                    logger.debug(f"start run no: {counter} (CONT: {cont_counter})")
                    self.handlejobs(msg, e, logger)
                    logger.debug(f"finish run no: {counter} (CONT: {cont_counter})")
                else:
                    cont_counter += 1
                    logger.debug(f"handling CONT no. {cont_counter}")
                if msg.next():
                    self.q.put(msg)
            except queue.Empty:
                logger.debug(f"closing process for {logger_name}")
                break
        self.q.put(Msg(action=ActionType.CLEAN))
-- 
2.43.2

[PATCH throttle v2 2/4] multiprocessing: force forking Export this patch

Multiprocessing uses fork on linux and spawn on Mac and Windows by
default. The latter doesn't even support forking. The current code
breaks when processes are spawned instead of forked. Enforce the use of
fork. Although the more general approach would be to allow spawning it
would need a bigger refactoring (mostly due to having to make sure
logging works as intended in all spawned processes) and since the
current version uses a unix socket to communicate anyway, even that
would not solve Windows support and throttle runs under WSL so that is
not a big issue.
---
 throttle_cli/server.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/throttle_cli/server.py b/throttle_cli/server.py
index 14c4b73..6864e1b 100644
--- a/throttle_cli/server.py
+++ b/throttle_cli/server.py
@@ -2,7 +2,7 @@ import logging
import os
import socket
import time
from multiprocessing import Process, Queue, active_children
from multiprocessing import Process, Queue, active_children, set_start_method
from pathlib import Path
from typing import Any, Callable

@@ -26,6 +26,7 @@ def ipcworker(socketpath: Path, handleMsg: Callable, handleInfo: Callable) -> No


def start_server(socketpath: Path, loglevel) -> None:
    set_start_method("fork")
    ipcqueue: Queue[Msg] = Queue()
    logqueue: Queue[Any] = Queue()
    comqueue: Queue[Any] = Queue()
-- 
2.43.2

[PATCH throttle v2 3/4] paths: improve cross-platform compatibility Export this patch

pyxdg had issues on Mac. Use platformdirs which is supposed to be more
robust.
---
 poetry.lock                   | 13 +------------
 pyproject.toml                |  2 +-
 throttle_cli/cli_client.py    |  4 ++--
 throttle_cli/cli_server.py    |  4 ++--
 throttle_cli/commandworker.py |  4 ++--
 throttle_cli/loglib.py        |  4 ++--
 6 files changed, 10 insertions(+), 21 deletions(-)

diff --git a/poetry.lock b/poetry.lock
index 4fcbbf4..64bb3a8 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -194,17 +194,6 @@ wcwidth = "*"
[package.extras]
tests = ["pytest", "pytest-cov", "pytest-lazy-fixtures"]

[[package]]
name = "pyxdg"
version = "0.28"
description = "PyXDG contains implementations of freedesktop.org standards in python."
optional = false
python-versions = "*"
files = [
    {file = "pyxdg-0.28-py2.py3-none-any.whl", hash = "sha256:bdaf595999a0178ecea4052b7f4195569c1ff4d344567bccdc12dfdf02d545ab"},
    {file = "pyxdg-0.28.tar.gz", hash = "sha256:3267bb3074e934df202af2ee0868575484108581e6f3cb006af1da35395e88b4"},
]

[[package]]
name = "ruff"
version = "0.2.2"
@@ -278,4 +267,4 @@ files = [
[metadata]
lock-version = "2.0"
python-versions = ">=3.10"
content-hash = "3022c95b0d251cf4ca38bea45a3cd73f5e55d08dea22ad8ad44f5667f7865a36"
content-hash = "2a27000aee2bf6a27e272775acbde5ab316a3e79947ca76bb1bfc432d15fd0c6"
diff --git a/pyproject.toml b/pyproject.toml
index 2a81200..7a80042 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,8 +11,8 @@ homepage = "https://sr.ht/~ferdinandyb/throttle/"
python = ">=3.10"
jsonrpclib-pelix = "^0.4.3.2"
toml = "^0.10.2"
pyxdg = "^0.28"
prettytable = "^3.10.0"
platformdirs = "^4.2.0"

[tool.poetry.scripts]
throttle = "throttle_cli.cli_client:main"
diff --git a/throttle_cli/cli_client.py b/throttle_cli/cli_client.py
index 4870f4a..de0c202 100644
--- a/throttle_cli/cli_client.py
+++ b/throttle_cli/cli_client.py
@@ -1,6 +1,6 @@
from pathlib import Path

from xdg import BaseDirectory
from platformdirs import user_runtime_dir

from . import __version__
from .arglib import storeJob, storeSilentJob
@@ -55,7 +55,7 @@ def main():
        help="Format for printing results.",
    )
    args, unknownargs = parser.parse_known_args()
    socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock"
    socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock"
    if args.statistics:
        get_info(socketpath, ActionType.STATS, args.format)
        return
diff --git a/throttle_cli/cli_server.py b/throttle_cli/cli_server.py
index f7515dc..ba57d36 100644
--- a/throttle_cli/cli_server.py
+++ b/throttle_cli/cli_server.py
@@ -1,7 +1,7 @@
import logging
from pathlib import Path

from xdg import BaseDirectory
from platformdirs import user_runtime_dir

from . import __version__
from .server import start_server
@@ -22,7 +22,7 @@ def main():
        "--version", action="version", version=f"throttle {__version__}"
    )
    args = parser.parse_args()
    socketpath = Path(BaseDirectory.get_runtime_dir()) / "throttle.sock"
    socketpath = Path(user_runtime_dir("throttle")) / "throttle.sock"
    loglevel = logging.INFO
    if args.LOGLEVEL:
        loglevel = getattr(logging, args.LOGLEVEL)
diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index 017dccb..7b9b0f6 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -12,7 +12,7 @@ from pathlib import Path
from typing import Dict, List

import toml
from xdg import BaseDirectory
from platformdirs import user_config_dir

from .structures import ActionType, Msg

@@ -44,7 +44,7 @@ class CommandWorker:
        self.loadConfig()

    def loadConfig(self):
        configdir = BaseDirectory.load_first_config("throttle")
        configdir = user_config_dir("throttle")
        if configdir is None:
            return
        configpath = Path(configdir) / "config.toml"
diff --git a/throttle_cli/loglib.py b/throttle_cli/loglib.py
index b77092b..97edfd5 100644
--- a/throttle_cli/loglib.py
+++ b/throttle_cli/loglib.py
@@ -2,12 +2,12 @@ import logging
import logging.handlers
from pathlib import Path

from xdg import BaseDirectory
from platformdirs import user_log_dir


def consumer(queue):
    root = logging.getLogger()
    logfolder = Path(BaseDirectory.xdg_state_home) / "throttle"
    logfolder = Path(user_log_dir("throttle"))
    logfolder.mkdir(parents=True, exist_ok=True)
    filehandler = logging.handlers.RotatingFileHandler(
        logfolder / "throttle.log", "a", 1024 * 1024 * 10, 3
-- 
2.43.2
Tested-by: Maarten Aertsen <maarten@nlnetlabs.nl>

[PATCH throttle v2 4/4] multiprocessing: switch to Manager().Queue() Export this patch

Apparently Queue().qsize is not implemented on MacOS. Switch to using
multiprocessing.Manager().Queue() which does implement qsize.
---
 throttle_cli/commandworker.py | 13 ++++++++++---
 throttle_cli/server.py        | 11 ++++++-----
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/throttle_cli/commandworker.py b/throttle_cli/commandworker.py
index 7b9b0f6..d451d38 100644
--- a/throttle_cli/commandworker.py
+++ b/throttle_cli/commandworker.py
@@ -9,7 +9,7 @@ from dataclasses import dataclass
from multiprocessing import Event, Process, Queue
from multiprocessing.synchronize import Event as SyncEvent
from pathlib import Path
from typing import Dict, List
from typing import Any, Dict, List

import toml
from platformdirs import user_config_dir
@@ -26,7 +26,14 @@ class workeritem:


class CommandWorker:
    def __init__(self, queue: Queue, logqueue: Queue, comqueue: Queue):
    def __init__(
        self,
        manager,
        queue: "Queue[Msg]",
        logqueue: "Queue[Any]",
        comqueue: "Queue[Any]",
    ):
        self.manager = manager
        self.q = queue
        self.logqueue = logqueue
        self.comqueue = comqueue
@@ -108,7 +115,7 @@ class CommandWorker:
        self.statistics["jobs"][msg.job]["total"] += 1
        if msg.job not in self.data or not self.data[msg.job].p.is_alive():
            self.logger.debug(f"{msg.job}: doesn't exist or finished, creating")
            q: Queue[Msg] = Queue()
            q: Queue[Msg] = self.manager.Queue()
            e = Event()
            p = Process(
                target=self.worker,
diff --git a/throttle_cli/server.py b/throttle_cli/server.py
index 6864e1b..c0acb6e 100644
--- a/throttle_cli/server.py
+++ b/throttle_cli/server.py
@@ -2,7 +2,7 @@ import logging
import os
import socket
import time
from multiprocessing import Process, Queue, active_children, set_start_method
from multiprocessing import Manager, Process, Queue, active_children, set_start_method
from pathlib import Path
from typing import Any, Callable

@@ -27,13 +27,14 @@ def ipcworker(socketpath: Path, handleMsg: Callable, handleInfo: Callable) -> No

def start_server(socketpath: Path, loglevel) -> None:
    set_start_method("fork")
    ipcqueue: Queue[Msg] = Queue()
    logqueue: Queue[Any] = Queue()
    comqueue: Queue[Any] = Queue()
    manager = Manager()
    ipcqueue: Queue[Msg] = manager.Queue()
    logqueue: Queue[Any] = manager.Queue()
    comqueue: Queue[Any] = manager.Queue()
    loggerp = Process(target=loglib.consumer, args=(logqueue,))
    loggerp.start()

    msgworker = CommandWorker(ipcqueue, logqueue, comqueue)
    msgworker = CommandWorker(manager, ipcqueue, logqueue, comqueue)
    loglib.publisher_config(logqueue, loglevel)
    logger = logging.getLogger("server")
    logger.debug(os.environ)
-- 
2.43.2