~rjarry/dlrepo

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch
3 3

[PATCH dlrepo v2] fs: run cleanup in dedicated background task

Details
Message ID
<20220616134204.15358-1-julien.floret@6wind.com>
DKIM signature
missing
Download raw message
Patch: +122 -272
Before this patch, the cleanup of orphan blobs was done in two places:

(1) by the dlrepo process, in a worker thread during the processing of
    an HTTP DELETE request;

(2) by the cleanup.py script, periodically.

There are some issues with those:

On (1), the cleanup of orphan blobs can take a very long time on a big
database; therefore, we must not wait for its completion before
responding to the HTTP DELETE request, otherwise the request may
timeout.

On (2), since the cleanup is run is another process, there are risks of
race conditions with dlrepo, in the (unlikely, but still) case where the
external cleanup process removes an orphan blob that is ready to be
garbage collected, but at the same time the blob is hardlinked again by
the dlrepo process between the test and the removal.

To address both issues, in dlrepo we create a background task dedicated
to all filesystem cleanup operations (applying the tag cleanup policy
and garbage-collecting orphan blobs). The task is scheduled by the HTTP
DELETE handlers, so it can still run *after* the response has been sent.
To avoid race conditions, the task must run in the main event loop, so
we move the cleanup code outside of the {branch,tag,job}.delete()
methods, which are run in a worker thread. The cleanup task itself
delegates tag deletions to the worker thread; as for the blob garbage
collection, the loop is allowed to be interrupted after each file
scanned.

The cleanup task can also run periodically, via a configurable timer
(24h by default, configurable via the DLREPO_TAG_CLEANUP_PERIOD variable).
It is also run at startup, and it can as well be triggered by sending
the SIGUSR1 signal to the dlrepo process.

We avoid running multiple cleanup tasks at the same time: when a cleanup
is requested, we cancel any existing timer and we check if a task was
already scheduled. If this is the case, we don't create a new task yet,
instead we set a "need_cleanup" flag. When the current cleanup task
ends, if "need_cleanup" is set a new task is scheduled, otherwise the
timer is re-armed.

The external cleanup.py script is removed, along with the related
systemd services; it could have been modified to just send a signal to
dlrepo, but there is no real point in doing so, since we now have the
configurable timer in dlrepo itself. Moreover, it would be tricky to
determine in the script to which process to send the signal, for example
if two servers are running on the same machine.
The drawback is that existing deployments may need to be reconfigured,
but since dlrepo is still in development, we think it is acceptable.

Signed-off-by: Julien Floret <julien.floret@6wind.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
---
Changes in v2:
- drop external cleanup script
- execute the cleanup policy in the internal background task
- add timer and signal handler

 Makefile                    |   2 -
 README.md                   |   1 -
 debian/dlrepo.install       |   3 -
 debian/dlrepo.manpages      |   1 -
 dlrepo/__main__.py          |  10 +++
 dlrepo/cleanup.py           | 125 ------------------------------------
 dlrepo/fs/__init__.py       | 119 ++++++++++++++++++++++++++--------
 dlrepo/fs/branch.py         |   6 +-
 dlrepo/fs/job.py            |   4 +-
 dlrepo/fs/tag.py            |   6 +-
 dlrepo/views/branch.py      |   1 +
 dlrepo/views/job.py         |   1 +
 dlrepo/views/tag.py         |   1 +
 docs/dlrepo-acls.5.scdoc    |   1 -
 docs/dlrepo-api.7.scdoc     |   1 -
 docs/dlrepo-cleanup.8.scdoc |  56 ----------------
 docs/dlrepo-cli.1.scdoc     |   6 +-
 docs/dlrepo-config.5.scdoc  |   7 +-
 docs/dlrepo-layout.7.scdoc  |   7 +-
 docs/dlrepo.7.scdoc         |   3 +-
 etc/default                 |   1 +
 etc/dlrepo-cleanup.service  |  20 ------
 etc/dlrepo-cleanup.timer    |  10 ---
 etc/dlrepo.service          |   1 -
 setup.py                    |   1 -
 25 files changed, 122 insertions(+), 272 deletions(-)
 delete mode 100644 dlrepo/cleanup.py
 delete mode 100644 docs/dlrepo-cleanup.8.scdoc
 delete mode 100644 etc/dlrepo-cleanup.service
 delete mode 100644 etc/dlrepo-cleanup.timer

diff --git a/Makefile b/Makefile
index 1f2affb89005..f4cb60618d3e 100644
--- a/Makefile
+++ b/Makefile
@@ -92,8 +92,6 @@ install_extras: $(destdir)$(prefix)/share/doc/dlrepo/examples/nginx.conf
install_services: $(destdir)$(sysconfig)/default/dlrepo
install_services: $(destdir)$(systemd_units)/dlrepo.socket
install_services: $(destdir)$(systemd_units)/dlrepo.service
install_services: $(destdir)$(systemd_units)/dlrepo-cleanup.service
install_services: $(destdir)$(systemd_units)/dlrepo-cleanup.timer

docs/%: docs/%.scdoc
	scdoc < $< > $@
diff --git a/README.md b/README.md
index 43a0d62b118f..0a9b5586d99e 100644
--- a/README.md
+++ b/README.md
@@ -34,7 +34,6 @@ make run
* [dlrepo-acls.5](https://git.sr.ht/~rjarry/dlrepo/tree/main/item/docs/dlrepo-acls.5.scdoc)
* [dlrepo-api.7](https://git.sr.ht/~rjarry/dlrepo/tree/main/item/docs/dlrepo-api.7.scdoc)
* [dlrepo-layout.7](https://git.sr.ht/~rjarry/dlrepo/tree/main/item/docs/dlrepo-layout.7.scdoc)
* [dlrepo-cleanup.8](https://git.sr.ht/~rjarry/dlrepo/tree/main/item/docs/dlrepo-cleanup.8.scdoc)

## contributing

diff --git a/debian/dlrepo.install b/debian/dlrepo.install
index 87d97726c969..00906a5b07b8 100755
--- a/debian/dlrepo.install
+++ b/debian/dlrepo.install
@@ -3,9 +3,6 @@
etc/default => /etc/default/dlrepo
etc/dlrepo.service lib/systemd/system
etc/dlrepo.socket lib/systemd/system
etc/dlrepo-cleanup.service lib/systemd/system
etc/dlrepo-cleanup.timer lib/systemd/system

usr/bin/dlrepo
usr/bin/dlrepo-cleanup
usr/lib/python3*/dist-packages/
diff --git a/debian/dlrepo.manpages b/debian/dlrepo.manpages
index 8fdb225c9d04..4feeb7fa9eb4 100644
--- a/debian/dlrepo.manpages
+++ b/debian/dlrepo.manpages
@@ -1,6 +1,5 @@
docs/dlrepo.7
docs/dlrepo-acls.5
docs/dlrepo-api.7
docs/dlrepo-cleanup.8
docs/dlrepo-config.5
docs/dlrepo-layout.7
diff --git a/dlrepo/__main__.py b/dlrepo/__main__.py
index cdf3b0bffaf9..50cb3a643b0e 100644
--- a/dlrepo/__main__.py
+++ b/dlrepo/__main__.py
@@ -5,6 +5,7 @@
import asyncio
import logging.config
import os
import signal
import socket
import sys

@@ -35,6 +36,14 @@ async def create_semaphore(app: web.Application):
    app["dlrepo_publish_semaphore"] = asyncio.Semaphore(max_requests)


# --------------------------------------------------------------------------------------
async def init_fs_cleanup(app: web.Application):
    repo = app["dlrepo_artifact_repository"]
    repo.schedule_cleanup()
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGUSR1, repo.schedule_cleanup)


# --------------------------------------------------------------------------------------
def main():
    logging.config.dictConfig(
@@ -79,6 +88,7 @@ def main():
    app["dlrepo_artifact_repository"] = repo

    app.on_startup.append(create_semaphore)
    app.on_startup.append(init_fs_cleanup)

    views.add_routes(app)

diff --git a/dlrepo/cleanup.py b/dlrepo/cleanup.py
deleted file mode 100644
index 3c9344ee667c..000000000000
--- a/dlrepo/cleanup.py
@@ -1,125 +0,0 @@
# Copyright (c) 2021 Julien Floret
# Copyright (c) 2021 Robin Jarry
# SPDX-License-Identifier: BSD-3-Clause

"""
Automatically cleanup old tags according to the branch cleanup policy.
"""

import argparse
import os
import pathlib
import re
import sys
import time

from dlrepo.fs import ArtifactRepository
from dlrepo.fs.tag import Tag


# --------------------------------------------------------------------------------------
def main():
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "-q",
        "--quiet",
        default=False,
        action="store_true",
        help="""
        Only display errors.
        """,
    )
    parser.add_argument(
        "-p",
        "--root-path",
        type=local_dir,
        default=default_root_path(),
        help="""
        The root path of the repository. Default to DLREPO_ROOT_DIR from the
        environment or from /etc/default/dlrepo.
        """,
    )
    args = parser.parse_args()
    try:
        cleanup(args)
    except Exception as e:
        print(f"error: {e}", file=sys.stderr)
        return 1
    return 0


# --------------------------------------------------------------------------------------
def cleanup(args):
    repo = ArtifactRepository(args.root_path)
    start = time.time()
    deleted = 0

    for branch in repo.get_branches():
        released_tags = []
        daily_tags = []

        for tag in branch.get_tags():
            if tag.is_locked():
                continue
            if tag.is_released():
                released_tags.append(tag)
            else:
                daily_tags.append(tag)

        released_tags.sort(key=Tag.creation_date, reverse=True)
        daily_tags.sort(key=Tag.creation_date, reverse=True)

        policy = branch.get_cleanup_policy()
        max_daily = policy.get("max_daily_tags", 0)
        if isinstance(max_daily, int) and max_daily > 0:
            for tag in daily_tags[max_daily:]:
                if not args.quiet:
                    print(f"Deleting daily tag {branch.name}/{tag.name} ...")
                tag.delete(cleanup_orphans=False)
                deleted += 1

        max_released = policy.get("max_released_tags", 0)
        if isinstance(max_released, int) and max_released > 0:
            for tag in released_tags[max_released:]:
                if not args.quiet:
                    print(f"Deleting released tag {branch.name}/{tag.name} ...")
                tag.delete(force=True, cleanup_orphans=False)
                deleted += 1

    repo.cleanup_orphan_blobs()

    for user_repo in repo.get_user_repos():
        user_repo.disk_usage_refresh()
        user_repo.disk_usage_save()

    if not args.quiet and deleted > 0:
        print(f"Deleted {deleted} tags in {time.time() - start:.1f}s")


# --------------------------------------------------------------------------------------
def local_dir(value):
    value = pathlib.Path(value)
    if not value.is_dir():
        raise argparse.ArgumentTypeError(f"{value}: No such directory")
    return value


# --------------------------------------------------------------------------------------
def default_root_path():
    if "DLREPO_ROOT_DIR" in os.environ:
        return pathlib.Path(os.environ["DLREPO_ROOT_DIR"])

    default_file = pathlib.Path("/etc/default/dlrepo")
    if default_file.is_file():
        match = re.search(
            r"DLREPO_ROOT_DIR=(.*)", default_file.read_text(encoding="utf-8")
        )
        if match:
            return pathlib.Path(match.group(1).strip().strip("\"'"))

    return pathlib.Path(".")


# --------------------------------------------------------------------------------------
if __name__ == "__main__":
    sys.exit(main())
diff --git a/dlrepo/fs/__init__.py b/dlrepo/fs/__init__.py
index bc91e3773ce6..32134781caca 100644
--- a/dlrepo/fs/__init__.py
+++ b/dlrepo/fs/__init__.py
@@ -15,6 +15,7 @@ from cachetools import LFUCache
from .branch import Branch
from .container import ContainerRegistry
from .product import Product
from .tag import Tag
from .util import CHUNK_SIZE, file_digest, human_readable, parse_digest


@@ -49,7 +50,7 @@ class AbstractRepository:
    def url_bit(self) -> Optional[str]:
        raise NotImplementedError()

    def cleanup_orphan_blobs(self):
    def schedule_cleanup(self):
        raise NotImplementedError()

    def next_upload(self) -> str:
@@ -89,6 +90,9 @@ class ArtifactRepository(AbstractRepository):
        self.blobs = self._path / ".blobs"
        self.uploads = self._path / ".uploads"
        self.user_repos = LFUCache(maxsize=512)
        self.cleanup_task = None
        self.need_cleanup = False
        self.timer = None

    def get_user_repo(self, user: str) -> "UserRepository":
        user = user.lower()
@@ -106,10 +110,92 @@ class ArtifactRepository(AbstractRepository):
    def url(self) -> str:
        return "/"

    def cleanup_orphan_blobs(self):
        # TODO: call blocking stuff (open, stat, unlink) in a thread?
    def schedule_cleanup(self):
        if self.timer:
            self.timer.cancel()
            self.timer = None
        if self.cleanup_task:
            self.need_cleanup = True
        else:
            self.cleanup_task = asyncio.create_task(self._cleanup())
            self.cleanup_task.add_done_callback(self._cleanup_done_cb)

    async def _cleanup(self):
        loop = asyncio.get_running_loop()
        deleted = 0

        for branch in self.get_branches():
            released_tags = []
            daily_tags = []

            for tag in branch.get_tags():
                if tag.is_locked():
                    continue
                if tag.is_released():
                    released_tags.append(tag)
                else:
                    daily_tags.append(tag)

            released_tags.sort(key=Tag.creation_date, reverse=True)
            daily_tags.sort(key=Tag.creation_date, reverse=True)

            policy = branch.get_cleanup_policy()
            max_daily = policy.get("max_daily_tags", 0)
            if isinstance(max_daily, int) and max_daily > 0:
                for tag in daily_tags[max_daily:]:
                    LOG.debug("Deleting daily tag %s/%s ...", branch.name, tag.name)
                    await loop.run_in_executor(None, tag.delete)
                    deleted += 1

            max_released = policy.get("max_released_tags", 0)
            if isinstance(max_released, int) and max_released > 0:
                for tag in released_tags[max_released:]:
                    LOG.debug("Deleting released tag %s/%s ...", branch.name, tag.name)
                    await loop.run_in_executor(None, tag.delete, force=True)
                    deleted += 1

        for user_repo in self.get_user_repos():
            user_repo.disk_usage_refresh()
            user_repo.disk_usage_save()

        LOG.debug("Deleted %d tags", deleted)

        await self._cleanup_orphan_blobs()

    ORPHAN_BLOB_LIFETIME = int(os.getenv("DLREPO_ORPHAN_BLOB_LIFETIME", "600"))

    async def _cleanup_orphan_blobs(self):
        for folder in (self.blobs, self.uploads):
            _cleanup_orphans(folder)
            if not folder.is_dir():
                return
            now = time.time()
            for root, _, files in os.walk(folder):
                for f in files:
                    await asyncio.sleep(0)
                    f = Path(root, f)
                    if not f.is_file():
                        continue
                    stat = f.stat()
                    if stat.st_nlink > 1:
                        continue
                    if now - stat.st_mtime < self.ORPHAN_BLOB_LIFETIME:
                        continue
                    f.unlink()

    CLEANUP_PERIOD = int(os.getenv("DLREPO_TAG_CLEANUP_PERIOD", str(24 * 60 * 60)))

    def _cleanup_done_cb(self, task):
        self.cleanup_task = None
        exc = task.exception()
        if exc:
            LOG.error("exception in cleanup %r", task, exc_info=exc)
        if self.need_cleanup:
            self.schedule_cleanup()
            self.need_cleanup = False
        elif not self.timer and self.CLEANUP_PERIOD:
            self.timer = asyncio.get_running_loop().call_later(
                self.CLEANUP_PERIOD, self.schedule_cleanup
            )

    def next_upload(self) -> str:
        self.uploads.mkdir(mode=0o755, parents=True, exist_ok=True)
@@ -232,8 +318,8 @@ class UserRepository(AbstractRepository):
    def url(self) -> str:
        return f"/~{self.user}/"

    def cleanup_orphan_blobs(self):
        self.base.cleanup_orphan_blobs()
    def schedule_cleanup(self):
        self.base.schedule_cleanup()

    def next_upload(self) -> str:
        return self.base.next_upload()
@@ -355,24 +441,3 @@ async def _check_and_move(
    src.rename(dst)
    dst.chmod(0o644)
    return dst


# --------------------------------------------------------------------------------------
ORPHAN_BLOB_LIFETIME = int(os.getenv("DLREPO_ORPHAN_BLOB_LIFETIME", "600"))


def _cleanup_orphans(folder: Path):
    if not folder.is_dir():
        return
    now = time.time()
    for root, _, files in os.walk(folder):
        for f in files:
            f = Path(root, f)
            if not f.is_file():
                continue
            stat = f.stat()
            if stat.st_nlink > 1:
                continue
            if now - stat.st_mtime < ORPHAN_BLOB_LIFETIME:
                continue
            f.unlink()
diff --git a/dlrepo/fs/branch.py b/dlrepo/fs/branch.py
index c42646df03a9..1bbca3c139d9 100644
--- a/dlrepo/fs/branch.py
+++ b/dlrepo/fs/branch.py
@@ -67,7 +67,7 @@ class Branch(SubDir):
                policy[field] = 0
        return policy

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
    def delete(self, *, force: bool = False):
        if not self.exists():
            raise FileNotFoundError()
        for t in self.get_tags():
@@ -76,7 +76,5 @@ class Branch(SubDir):
            if not force and t.is_released():
                raise OSError(f"Tag {t.name} is released, use force")
        for t in self.get_tags():
            t.delete(force=force, cleanup_orphans=False)
            t.delete(force=force)
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/fs/job.py b/dlrepo/fs/job.py
index 5120f159f5f9..889da5c9249d 100644
--- a/dlrepo/fs/job.py
+++ b/dlrepo/fs/job.py
@@ -187,10 +187,8 @@ class Job(SubDir):
                .get_version(str(data["version"]))
            )

    def delete(self, *, cleanup_orphans: bool = True):
    def delete(self):
        if not self.exists():
            raise FileNotFoundError()
        self._cleanup_product_tree()
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/fs/tag.py b/dlrepo/fs/tag.py
index 40a0bfcf6817..39f48e80598c 100644
--- a/dlrepo/fs/tag.py
+++ b/dlrepo/fs/tag.py
@@ -204,7 +204,7 @@ class Tag(SubDir):
        elif path.is_file():
            path.unlink()

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
    def delete(self, *, force: bool = False):
        if not self.exists():
            raise FileNotFoundError()
        if self.is_locked():
@@ -212,7 +212,5 @@ class Tag(SubDir):
        if not force and self.is_released():
            raise OSError(f"Tag {self.name} is released, use force")
        for j in self.get_jobs():
            j.delete(cleanup_orphans=False)
            j.delete()
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/views/branch.py b/dlrepo/views/branch.py
index 6ff617f7c9b2..95ccfac329f8 100644
--- a/dlrepo/views/branch.py
+++ b/dlrepo/views/branch.py
@@ -122,6 +122,7 @@ class BranchView(BaseView):
        try:
            force = "force" in self.request.query
            await loop.run_in_executor(None, lambda: branch.delete(force=force))
            branch.root().schedule_cleanup()
            return web.Response()
        except OSError as e:
            raise web.HTTPBadRequest(reason=str(e)) from e
diff --git a/dlrepo/views/job.py b/dlrepo/views/job.py
index 9ad32e43b462..f91cc20221d5 100644
--- a/dlrepo/views/job.py
+++ b/dlrepo/views/job.py
@@ -142,6 +142,7 @@ class JobView(JobArchiveView):
        try:
            job = self._get_job()
            await loop.run_in_executor(None, job.delete)
            job.root().schedule_cleanup()
        except FileNotFoundError as e:
            raise web.HTTPNotFound() from e
        except OSError as e:
diff --git a/dlrepo/views/tag.py b/dlrepo/views/tag.py
index 2d20bfe6f9a8..1c802025c46c 100644
--- a/dlrepo/views/tag.py
+++ b/dlrepo/views/tag.py
@@ -102,6 +102,7 @@ class TagView(BaseView):
            tag = self._get_tag()
            force = "force" in self.request.query
            await loop.run_in_executor(None, lambda: tag.delete(force=force))
            tag.root().schedule_cleanup()
            return web.Response()
        except OSError as e:
            raise web.HTTPBadRequest(reason=str(e)) from e
diff --git a/docs/dlrepo-acls.5.scdoc b/docs/dlrepo-acls.5.scdoc
index 1acd274eefec..2a3782768c37 100644
--- a/docs/dlrepo-acls.5.scdoc
+++ b/docs/dlrepo-acls.5.scdoc
@@ -123,7 +123,6 @@ ro /v2/foomoo/arm64/3.0/**

*dlrepo*(7),
*dlrepo-api*(1),
*dlrepo-cleanup*(8),
*dlrepo-cli*(1),
*dlrepo-config*(5),
*dlrepo-layout*(7)
diff --git a/docs/dlrepo-api.7.scdoc b/docs/dlrepo-api.7.scdoc
index 5bde72944a04..0632b561e313 100644
--- a/docs/dlrepo-api.7.scdoc
+++ b/docs/dlrepo-api.7.scdoc
@@ -835,7 +835,6 @@ docker push dlrepo.foobaz.org/u/bofh/master/foo-x86-debug:v3.2.3

*dlrepo*(7),
*dlrepo-acls*(5),
*dlrepo-cleanup*(8),
*dlrepo-cli*(1),
*dlrepo-config*(5),
*dlrepo-layout*(7)
diff --git a/docs/dlrepo-cleanup.8.scdoc b/docs/dlrepo-cleanup.8.scdoc
deleted file mode 100644
index 11a8da2d1ed2..000000000000
--- a/docs/dlrepo-cleanup.8.scdoc
@@ -1,56 +0,0 @@
dlrepo-cleanup(8) "" "Administration Manual"

# NAME

*dlrepo-cleanup* -- artifact repository automatic cleanup

# DESCRIPTION

*dlrepo* is an artifact repository. It supports storing build artifacts (binary
packages, documentation, vm images, container images, etc.) in a structured
file system tree. It exposes an HTTP API to upload files, delete them, add
metadata, etc.

This manual describes the automatic cleanup process of _DLREPO_ROOT_DIR_.

# CONFIGURATION

A cleanup policy must be configured per branch. This is done by using the HTTP
API (or alternatively, by manually creating a _.cleanup_policy_ JSON file in
the branch folder). See *dlrepo-api*(7) for more details.

By default, only orphan files are deleted from the _.blobs/_ folder. See
*dlrepo-layout*(7) for more info.

# WHAT IS DELETED?

For all branches in the root repository, the cleanup policy is enforced. It
implies ordering the released and non-released tags by date and only keeping
the most recent _max_released_tags_ and _max_daily_tags_ accordingly. If there
is no cleanup policy, nothing is deleted.

After deleting tags, orphan files are deleted from the _.blobs/_ folder.

User repositories are *not* subject to automatic cleanup.

# EXECUTION

The cleanup operation should be executed with the owner of the
_DLREPO_ROOT_DIR_ folder.

For convenience, a systemd _dlrepo-cleanup.timer_ unit is provided which
executes the _dlrepo-cleanup.service_ every day.

# SEE ALSO

*dlrepo*(7),
*dlrepo-acls*(5),
*dlrepo-api*(7),
*dlrepo-cli*(1),
*dlrepo-config*(5),
*dlrepo-layout*(7)

# AUTHORS

Created and maintained by Robin Jarry and Julien Floret. For more information,
development and bug reports, see _https://sr.ht/~rjarry/dlrepo/_.
diff --git a/docs/dlrepo-cli.1.scdoc b/docs/dlrepo-cli.1.scdoc
index db3f56495fb8..ef25de413a79 100644
--- a/docs/dlrepo-cli.1.scdoc
+++ b/docs/dlrepo-cli.1.scdoc
@@ -123,7 +123,7 @@ _PARAM=VALUE_
## dlrepo-cli lock [-u] BRANCH TAG [JOB]

Lock a job to prevent further modifications. Or lock a tag so that it is never
deleted by *dlrepo-cleanup*(8).
deleted by the cleanup policy.

*-u*, *--unlock*
	Unlock instead of locking.
@@ -217,8 +217,7 @@ _JOB_

## dlrepo-cli cleanup-policy [-d MAX_DAILY] [-r MAX_RELEASED] BRANCH

Get or set a branch cleanup policy. This affects what *dlrepo-cleanup*(8) will
delete from this branch.
Get or set a branch cleanup policy.

*-d* _NUM_, *--max-daily*=_NUM_
	Maximum non-released tags to keep.
@@ -234,7 +233,6 @@ _BRANCH_
*dlrepo*(7),
*dlrepo-acls*(5),
*dlrepo-api*(7),
*dlrepo-cleanup*(7),
*dlrepo-config*(5),
*dlrepo-layout*(7)

diff --git a/docs/dlrepo-config.5.scdoc b/docs/dlrepo-config.5.scdoc
index 789a59ffb37a..93ce5ad1176d 100644
--- a/docs/dlrepo-config.5.scdoc
+++ b/docs/dlrepo-config.5.scdoc
@@ -50,7 +50,11 @@ running the daemon process.
	via the *dlrepo-api*(7) if they are older than
	*DLREPO_ORPHAN_BLOB_LIFETIME*.

	Otherwise, *dlrepo-cleanup*(8) will delete them in a later time.
	Otherwise, the garbage collection task will delete them in a later time.

*DLREPO_TAG_CLEANUP_PERIOD* (default: _86400_)
	The maximum period in seconds after which tags will be deleted according
	to the branch cleanup policy.

*DLREPO_LOG_LEVEL* (default: _WARNING_)
	Only events that are above that level are sent to the system logs.
@@ -271,7 +275,6 @@ running the daemon process.
*dlrepo*(7),
*dlrepo-acls*(5),
*dlrepo-api*(1),
*dlrepo-cleanup*(8),
*dlrepo-cli*(1),
*dlrepo-layout*(7)

diff --git a/docs/dlrepo-layout.7.scdoc b/docs/dlrepo-layout.7.scdoc
index 49ab60062105..ba1ea7fe13ca 100644
--- a/docs/dlrepo-layout.7.scdoc
+++ b/docs/dlrepo-layout.7.scdoc
@@ -42,7 +42,7 @@ This layout is used for deduplication purposes. When the same file is uploaded
multiple times in different _{branch}/{tag}/{job}/{format}_ combinations, only
a hard link is created. When a file in _.blobs/_ only has one link and is older
than _DLREPO_ORPHAN_BLOB_LIFETIME_ seconds, it is candidate for garbage
collection by *dlrepo-cleanup*(8).
collection.

## .uploads/

@@ -62,7 +62,7 @@ Also contains the following reserved files:

_.cleanup_policy_ (optional)
	This file is in the JSON format and controls how many tags are
	preserved when running *dlrepo-cleanup*(8). Here is an example of
	preserved by the cleanup policy. Here is an example of
	_.cleanup_policy_ file:

	```
@@ -85,7 +85,7 @@ _.released_ (optional)
	_max_daily_tags_ cleanup policy.
_.locked_ (optional)
	Empty file created when a tag is marked as locked. Locked tags are
	never deleted by *dlrepo-cleanup*(8).
	never deleted by the cleanup policy.
_.publish_status_ (optional)
	Text file containing details about the tag publication status to
	another dlrepo server.
@@ -211,7 +211,6 @@ _.disk_usage_
*dlrepo*(7),
*dlrepo-acls*(5),
*dlrepo-api*(7),
*dlrepo-cleanup*(8),
*dlrepo-cli*(1),
*dlrepo-config*(5)

diff --git a/docs/dlrepo.7.scdoc b/docs/dlrepo.7.scdoc
index b22a78ecdc9e..429c47532471 100644
--- a/docs/dlrepo.7.scdoc
+++ b/docs/dlrepo.7.scdoc
@@ -38,13 +38,12 @@ _dlrepo.service_ automatically and pass the socket to the python daemon.

# GARBAGE COLLECTION

*dlrepo-cleanup*(8) will remove tags that exceed any branch cleanup policy.
Tags that exceed any branch cleanup policy will be automatically removed.

# SEE ALSO

*dlrepo-acls*(5),
*dlrepo-api*(7),
*dlrepo-cleanup*(8),
*dlrepo-cli*(1),
*dlrepo-config*(5),
*dlrepo-layout*(7)
diff --git a/etc/default b/etc/default
index a8de461cf244..2d357c8d8d41 100644
--- a/etc/default
+++ b/etc/default
@@ -32,3 +32,4 @@
#DLREPO_PUBLISH_URL=
#DLREPO_PUBLISH_AUTH=
#DLREPO_PUBLISH_MAX_REQUESTS=1
#DLREPO_TAG_CLEANUP_PERIOD=86400
diff --git a/etc/dlrepo-cleanup.service b/etc/dlrepo-cleanup.service
deleted file mode 100644
index 1bf416ee1850..000000000000
--- a/etc/dlrepo-cleanup.service
@@ -1,20 +0,0 @@
# ex: ft=systemd
[Unit]
Description=Discard old tags according to each branch cleanup policy.
Documentation=man:dlrepo-cleanup(8)

[Service]
Type=oneshot
EnvironmentFile=-/etc/default/dlrepo
ExecStart=/usr/bin/dlrepo-cleanup
User=dlrepo
SyslogIdentifier=dlrepo-cleanup
ProtectSystem=strict
ProtectHome=tmpfs
ReadWritePaths=/var/lib/dlrepo
PrivateTmp=true
PrivateDevices=true
ProtectKernelTunables=true
ProtectKernelLogs=true
ProtectControlGroups=true
CapabilityBoundingSet=~CAP_SYS_ADMIN
diff --git a/etc/dlrepo-cleanup.timer b/etc/dlrepo-cleanup.timer
deleted file mode 100644
index 5c86760e22ab..000000000000
--- a/etc/dlrepo-cleanup.timer
@@ -1,10 +0,0 @@
# ex: ft=systemd

[Unit]
Description=Cleanup tags according to branch cleanup policy.

[Timer]
OnCalendar=daily

[Install]
WantedBy=timers.target
diff --git a/etc/dlrepo.service b/etc/dlrepo.service
index 6884d362935b..df0fcf2f0bf3 100644
--- a/etc/dlrepo.service
+++ b/etc/dlrepo.service
@@ -26,4 +26,3 @@ CapabilityBoundingSet=~CAP_SYS_ADMIN

[Install]
Also=dlrepo.socket
Also=dlrepo-cleanup.timer
diff --git a/setup.py b/setup.py
index 1340e95ef6a8..140673807b83 100644
--- a/setup.py
+++ b/setup.py
@@ -45,7 +45,6 @@ setuptools.setup(
    entry_points="""
    [console_scripts]
    dlrepo = dlrepo.__main__:main
    dlrepo-cleanup = dlrepo.cleanup:main
    """,
    scripts=["dlrepo-cli"],
)
-- 
2.30.2

[dlrepo/patches/.build.yml] build success

builds.sr.ht <builds@sr.ht>
Details
Message ID
<CKRLPX8AVLRT.1WOOQQE91U94G@cirno2>
In-Reply-To
<20220616134204.15358-1-julien.floret@6wind.com> (view parent)
DKIM signature
missing
Download raw message
dlrepo/patches/.build.yml: SUCCESS in 1m19s

[fs: run cleanup in dedicated background task][0] v2 from [Julien Floret][1]

[0]: https://lists.sr.ht/~rjarry/dlrepo/patches/33047
[1]: julien.floret@6wind.com

✓ #781714 SUCCESS dlrepo/patches/.build.yml https://builds.sr.ht/~rjarry/job/781714
Details
Message ID
<CL0WGF1E8C16.2E8CBFJWGODFK@marty>
In-Reply-To
<20220616134204.15358-1-julien.floret@6wind.com> (view parent)
DKIM signature
missing
Download raw message
Hi Julien,

Sorry about the late reply, I got sidetracked by a bunch of things.

Julien Floret, Jun 16, 2022 at 15:42:
> Before this patch, the cleanup of orphan blobs was done in two places:
>
> (1) by the dlrepo process, in a worker thread during the processing of
>     an HTTP DELETE request;
>
> (2) by the cleanup.py script, periodically.
>
> There are some issues with those:
>
> On (1), the cleanup of orphan blobs can take a very long time on a big
> database; therefore, we must not wait for its completion before
> responding to the HTTP DELETE request, otherwise the request may
> timeout.
>
> On (2), since the cleanup is run is another process, there are risks of
> race conditions with dlrepo, in the (unlikely, but still) case where the
> external cleanup process removes an orphan blob that is ready to be
> garbage collected, but at the same time the blob is hardlinked again by
> the dlrepo process between the test and the removal.
>
> To address both issues, in dlrepo we create a background task dedicated
> to all filesystem cleanup operations (applying the tag cleanup policy
> and garbage-collecting orphan blobs). The task is scheduled by the HTTP
> DELETE handlers, so it can still run *after* the response has been sent.
> To avoid race conditions, the task must run in the main event loop, so
> we move the cleanup code outside of the {branch,tag,job}.delete()
> methods, which are run in a worker thread. The cleanup task itself
> delegates tag deletions to the worker thread; as for the blob garbage
> collection, the loop is allowed to be interrupted after each file
> scanned.
>
> The cleanup task can also run periodically, via a configurable timer
> (24h by default, configurable via the DLREPO_TAG_CLEANUP_PERIOD variable).
> It is also run at startup, and it can as well be triggered by sending
> the SIGUSR1 signal to the dlrepo process.
>
> We avoid running multiple cleanup tasks at the same time: when a cleanup
> is requested, we cancel any existing timer and we check if a task was
> already scheduled. If this is the case, we don't create a new task yet,
> instead we set a "need_cleanup" flag. When the current cleanup task
> ends, if "need_cleanup" is set a new task is scheduled, otherwise the
> timer is re-armed.

I think this cleanup coroutine should do two different things depending
on how it is called.

1) after deleting a tag or job (responding to HTTP DELETE), it should
   *only* cleanup the orphan blobs.

2) upon receive or SIGUSR1 or when DLREPO_TAG_CLEANUP_PERIOD is expired,
   it should delete tags according to each branch cleanup policy,
   refresh each user quotas *and finally* cleanup the orphan blobs.

This could be easily done by adding a parameter somewhere.

> The external cleanup.py script is removed, along with the related
> systemd services; it could have been modified to just send a signal to
> dlrepo, but there is no real point in doing so, since we now have the
> configurable timer in dlrepo itself. Moreover, it would be tricky to
> determine in the script to which process to send the signal, for example
> if two servers are running on the same machine.
> The drawback is that existing deployments may need to be reconfigured,
> but since dlrepo is still in development, we think it is acceptable.

Sure, I don't think that is a major issue.

I have a few remarks about the code itself. See below:

> diff --git a/dlrepo/fs/__init__.py b/dlrepo/fs/__init__.py
> index bc91e3773ce6..32134781caca 100644
> --- a/dlrepo/fs/__init__.py
> +++ b/dlrepo/fs/__init__.py
> @@ -15,6 +15,7 @@ from cachetools import LFUCache
>  from .branch import Branch
>  from .container import ContainerRegistry
>  from .product import Product
> +from .tag import Tag
>  from .util import CHUNK_SIZE, file_digest, human_readable, parse_digest
>  
>  
> @@ -49,7 +50,7 @@ class AbstractRepository:
>      def url_bit(self) -> Optional[str]:
>          raise NotImplementedError()
>  
> -    def cleanup_orphan_blobs(self):
> +    def schedule_cleanup(self):
>          raise NotImplementedError()
>  
>      def next_upload(self) -> str:
> @@ -89,6 +90,9 @@ class ArtifactRepository(AbstractRepository):
>          self.blobs = self._path / ".blobs"
>          self.uploads = self._path / ".uploads"
>          self.user_repos = LFUCache(maxsize=512)
> +        self.cleanup_task = None
> +        self.need_cleanup = False
> +        self.timer = None

Perhaps rename self.timer to self.cleanup_timer.
Also, add a boolean, self.cleanup_tags = False.

>  
>      def get_user_repo(self, user: str) -> "UserRepository":
>          user = user.lower()
> @@ -106,10 +110,92 @@ class ArtifactRepository(AbstractRepository):
>      def url(self) -> str:
>          return "/"
>  
> -    def cleanup_orphan_blobs(self):
> -        # TODO: call blocking stuff (open, stat, unlink) in a thread?
> +    def schedule_cleanup(self):

Here, the extra parameter could be added:

       def schedule_cleanup(self, tags=True):
           self.cleanup_tags = self.cleanup_tags or tags

> diff --git a/dlrepo/views/branch.py b/dlrepo/views/branch.py
> index 6ff617f7c9b2..95ccfac329f8 100644
> --- a/dlrepo/views/branch.py
> +++ b/dlrepo/views/branch.py
> @@ -122,6 +122,7 @@ class BranchView(BaseView):
>          try:
>              force = "force" in self.request.query
>              await loop.run_in_executor(None, lambda: branch.delete(force=force))
> +            branch.root().schedule_cleanup()

               self.repo().schedule_cleanup(False)

>              return web.Response()
>          except OSError as e:
>              raise web.HTTPBadRequest(reason=str(e)) from e
> diff --git a/dlrepo/views/job.py b/dlrepo/views/job.py
> index 9ad32e43b462..f91cc20221d5 100644
> --- a/dlrepo/views/job.py
> +++ b/dlrepo/views/job.py
> @@ -142,6 +142,7 @@ class JobView(JobArchiveView):
>          try:
>              job = self._get_job()
>              await loop.run_in_executor(None, job.delete)
> +            job.root().schedule_cleanup()

               self.repo().schedule_cleanup(False)

>          except FileNotFoundError as e:
>              raise web.HTTPNotFound() from e
>          except OSError as e:
> diff --git a/dlrepo/views/tag.py b/dlrepo/views/tag.py
> index 2d20bfe6f9a8..1c802025c46c 100644
> --- a/dlrepo/views/tag.py
> +++ b/dlrepo/views/tag.py
> @@ -102,6 +102,7 @@ class TagView(BaseView):
>              tag = self._get_tag()
>              force = "force" in self.request.query
>              await loop.run_in_executor(None, lambda: tag.delete(force=force))
> +            tag.root().schedule_cleanup()

               self.repo().schedule_cleanup(False)

>              return web.Response()
>          except OSError as e:
>              raise web.HTTPBadRequest(reason=str(e)) from e

> diff --git a/docs/dlrepo-config.5.scdoc b/docs/dlrepo-config.5.scdoc
> index 789a59ffb37a..93ce5ad1176d 100644
> --- a/docs/dlrepo-config.5.scdoc
> +++ b/docs/dlrepo-config.5.scdoc
> @@ -50,7 +50,11 @@ running the daemon process.
>  	via the *dlrepo-api*(7) if they are older than
>  	*DLREPO_ORPHAN_BLOB_LIFETIME*.
>  
> -	Otherwise, *dlrepo-cleanup*(8) will delete them in a later time.
> +	Otherwise, the garbage collection task will delete them in a later time.
> +
> +*DLREPO_TAG_CLEANUP_PERIOD* (default: _86400_)
> +	The maximum period in seconds after which tags will be deleted according
> +	to the branch cleanup policy.

I think the option description would need rewording if you follow what
I suggested at the beginning.
Details
Message ID
<CAHfJR7iGvysb+rvo-w5ZMONKP=C0boPC-m+w99_bO1R7JZsjaw@mail.gmail.com>
In-Reply-To
<CL0WGF1E8C16.2E8CBFJWGODFK@marty> (view parent)
DKIM signature
pass
Download raw message
Hi Robin,

Le lun. 27 juin 2022 à 14:02, Robin Jarry <robin@jarry.cc> a écrit :
>
> Hi Julien,
>
> Sorry about the late reply, I got sidetracked by a bunch of things.

No problem, thanks for your review!

> Julien Floret, Jun 16, 2022 at 15:42:
> > Before this patch, the cleanup of orphan blobs was done in two places:
> >
> > (1) by the dlrepo process, in a worker thread during the processing of
> >     an HTTP DELETE request;
> >
> > (2) by the cleanup.py script, periodically.
> >
> > There are some issues with those:
> >
> > On (1), the cleanup of orphan blobs can take a very long time on a big
> > database; therefore, we must not wait for its completion before
> > responding to the HTTP DELETE request, otherwise the request may
> > timeout.
> >
> > On (2), since the cleanup is run is another process, there are risks of
> > race conditions with dlrepo, in the (unlikely, but still) case where the
> > external cleanup process removes an orphan blob that is ready to be
> > garbage collected, but at the same time the blob is hardlinked again by
> > the dlrepo process between the test and the removal.
> >
> > To address both issues, in dlrepo we create a background task dedicated
> > to all filesystem cleanup operations (applying the tag cleanup policy
> > and garbage-collecting orphan blobs). The task is scheduled by the HTTP
> > DELETE handlers, so it can still run *after* the response has been sent.
> > To avoid race conditions, the task must run in the main event loop, so
> > we move the cleanup code outside of the {branch,tag,job}.delete()
> > methods, which are run in a worker thread. The cleanup task itself
> > delegates tag deletions to the worker thread; as for the blob garbage
> > collection, the loop is allowed to be interrupted after each file
> > scanned.
> >
> > The cleanup task can also run periodically, via a configurable timer
> > (24h by default, configurable via the DLREPO_TAG_CLEANUP_PERIOD variable).
> > It is also run at startup, and it can as well be triggered by sending
> > the SIGUSR1 signal to the dlrepo process.
> >
> > We avoid running multiple cleanup tasks at the same time: when a cleanup
> > is requested, we cancel any existing timer and we check if a task was
> > already scheduled. If this is the case, we don't create a new task yet,
> > instead we set a "need_cleanup" flag. When the current cleanup task
> > ends, if "need_cleanup" is set a new task is scheduled, otherwise the
> > timer is re-armed.
>
> I think this cleanup coroutine should do two different things depending
> on how it is called.
>
> 1) after deleting a tag or job (responding to HTTP DELETE), it should
>    *only* cleanup the orphan blobs.
>
> 2) upon receive or SIGUSR1 or when DLREPO_TAG_CLEANUP_PERIOD is expired,
>    it should delete tags according to each branch cleanup policy,
>    refresh each user quotas *and finally* cleanup the orphan blobs.
>
> This could be easily done by adding a parameter somewhere.

You're right, there is indeed no need to do the tag cleanup on each HTTP DELETE.

> > The external cleanup.py script is removed, along with the related
> > systemd services; it could have been modified to just send a signal to
> > dlrepo, but there is no real point in doing so, since we now have the
> > configurable timer in dlrepo itself. Moreover, it would be tricky to
> > determine in the script to which process to send the signal, for example
> > if two servers are running on the same machine.
> > The drawback is that existing deployments may need to be reconfigured,
> > but since dlrepo is still in development, we think it is acceptable.
>
> Sure, I don't think that is a major issue.
>
> I have a few remarks about the code itself. See below:

Thanks for your remarks. I will send a v3 to adress them.
Reply to thread Export thread (mbox)