~rjarry/dlrepo

dlrepo: views: run orphans cleanup in background v1 NEEDS REVISION

Julien Floret: 1
 views: run orphans cleanup in background

 8 files changed, 50 insertions(+), 32 deletions(-)
#770803 .build.yml success
Julien Floret, May 31, 2022 at 11:57:
Next
Robin, all,

Le mar. 31 mai 2022 à 12:22, Robin Jarry <robin@jarry.cc> a écrit :
Next
Julien Floret, Jun 01, 2022 at 10:09:
Next
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~rjarry/dlrepo/patches/32638/mbox | git am -3
Learn more about email & git

[PATCH dlrepo] views: run orphans cleanup in background Export this patch

The cleanup of orphan blobs can take a very long time on a big database.
We must not wait for it to complete before responding to the
HTTP DELETE request, otherwise the request may timeout.
Let's run the cleanup in a background asyncio task that will run after
the response to the request.
To avoid blocking the event loop, we allow the cleanup loop to be
interrupted after each file scanned.

We avoid running several unnecessary cleanup tasks in parallel:
when a cleanup is requested, we check if a task was already scheduled.
If so, we don't create a new task yet, instead we set a "need_cleanup"
flag. When the first cleanup task ends, if "need_cleanup" is set, we
reschedule a new task.

Signed-off-by: Julien Floret <julien.floret@6wind.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
---
 dlrepo/cleanup.py      |  4 +--
 dlrepo/fs/__init__.py  | 59 ++++++++++++++++++++++++++++--------------
 dlrepo/fs/branch.py    |  6 ++---
 dlrepo/fs/job.py       |  4 +--
 dlrepo/fs/tag.py       |  6 ++---
 dlrepo/views/branch.py |  1 +
 dlrepo/views/job.py    |  1 +
 dlrepo/views/tag.py    |  1 +
 8 files changed, 50 insertions(+), 32 deletions(-)

diff --git a/dlrepo/cleanup.py b/dlrepo/cleanup.py
index 3c9344ee667c..c97ee1f5f25f 100644
--- a/dlrepo/cleanup.py
+++ b/dlrepo/cleanup.py
@@ -75,7 +75,7 @@ def cleanup(args):
            for tag in daily_tags[max_daily:]:
                if not args.quiet:
                    print(f"Deleting daily tag {branch.name}/{tag.name} ...")
                tag.delete(cleanup_orphans=False)
                tag.delete()
                deleted += 1

        max_released = policy.get("max_released_tags", 0)
@@ -83,7 +83,7 @@ def cleanup(args):
            for tag in released_tags[max_released:]:
                if not args.quiet:
                    print(f"Deleting released tag {branch.name}/{tag.name} ...")
                tag.delete(force=True, cleanup_orphans=False)
                tag.delete(force=True)
                deleted += 1

    repo.cleanup_orphan_blobs()
diff --git a/dlrepo/fs/__init__.py b/dlrepo/fs/__init__.py
index 9e6212da8519..a95c91c343d6 100644
--- a/dlrepo/fs/__init__.py
+++ b/dlrepo/fs/__init__.py
@@ -8,7 +8,7 @@ import os
from pathlib import Path
import shutil
import time
from typing import Awaitable, Callable, Iterator, Optional
from typing import Awaitable, Callable, Iterator, Optional, Tuple

from cachetools import LFUCache

@@ -86,6 +86,8 @@ class ArtifactRepository(AbstractRepository):
        self.blobs = self._path / ".blobs"
        self.uploads = self._path / ".uploads"
        self.user_repos = LFUCache(maxsize=512)
        self.cleanup_task = None
        self.need_cleanup = False

    def get_user_repo(self, user: str) -> "UserRepository":
        user = user.lower()
@@ -103,10 +105,27 @@ class ArtifactRepository(AbstractRepository):
    def url(self) -> str:
        return "/"

    def _cleanup_done_cb(self, task):
        exc = task.exception()
        if exc:
            LOG.error("exception in cleanup_orphan_blobs %r", task, exc_info=exc)
        if self.need_cleanup:
            self._create_cleanup_task()
            self.need_cleanup = False
        else:
            self.cleanup_task = None

    def _create_cleanup_task(self):
        self.cleanup_task = asyncio.create_task(
            _cleanup_orphans((self.blobs, self.uploads))
        )
        self.cleanup_task.add_done_callback(self._cleanup_done_cb)

    def cleanup_orphan_blobs(self):
        # TODO: call blocking stuff (open, stat, unlink) in a thread?
        for folder in (self.blobs, self.uploads):
            _cleanup_orphans(folder)
        if self.cleanup_task:
            self.need_cleanup = True
        else:
            self._create_cleanup_task()

    def next_upload(self) -> str:
        self.uploads.mkdir(mode=0o755, parents=True, exist_ok=True)
@@ -347,18 +366,20 @@ async def _check_and_move(
ORPHAN_BLOB_LIFETIME = int(os.getenv("DLREPO_ORPHAN_BLOB_LIFETIME", "600"))


def _cleanup_orphans(folder: Path):
    if not folder.is_dir():
        return
    now = time.time()
    for root, _, files in os.walk(folder):
        for f in files:
            f = Path(root, f)
            if not f.is_file():
                continue
            stat = f.stat()
            if stat.st_nlink > 1:
                continue
            if now - stat.st_mtime < ORPHAN_BLOB_LIFETIME:
                continue
            f.unlink()
async def _cleanup_orphans(folders: Tuple[Path]):
    for folder in folders:
        if not folder.is_dir():
            return
        now = time.time()
        for root, _, files in os.walk(folder):
            for f in files:
                await asyncio.sleep(0)
                f = Path(root, f)
                if not f.is_file():
                    continue
                stat = f.stat()
                if stat.st_nlink > 1:
                    continue
                if now - stat.st_mtime < ORPHAN_BLOB_LIFETIME:
                    continue
                f.unlink()
diff --git a/dlrepo/fs/branch.py b/dlrepo/fs/branch.py
index c42646df03a9..1bbca3c139d9 100644
--- a/dlrepo/fs/branch.py
+++ b/dlrepo/fs/branch.py
@@ -67,7 +67,7 @@ class Branch(SubDir):
                policy[field] = 0
        return policy

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
    def delete(self, *, force: bool = False):
        if not self.exists():
            raise FileNotFoundError()
        for t in self.get_tags():
@@ -76,7 +76,5 @@ class Branch(SubDir):
            if not force and t.is_released():
                raise OSError(f"Tag {t.name} is released, use force")
        for t in self.get_tags():
            t.delete(force=force, cleanup_orphans=False)
            t.delete(force=force)
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/fs/job.py b/dlrepo/fs/job.py
index 5120f159f5f9..889da5c9249d 100644
--- a/dlrepo/fs/job.py
+++ b/dlrepo/fs/job.py
@@ -187,10 +187,8 @@ class Job(SubDir):
                .get_version(str(data["version"]))
            )

    def delete(self, *, cleanup_orphans: bool = True):
    def delete(self):
        if not self.exists():
            raise FileNotFoundError()
        self._cleanup_product_tree()
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/fs/tag.py b/dlrepo/fs/tag.py
index 40a0bfcf6817..39f48e80598c 100644
--- a/dlrepo/fs/tag.py
+++ b/dlrepo/fs/tag.py
@@ -204,7 +204,7 @@ class Tag(SubDir):
        elif path.is_file():
            path.unlink()

    def delete(self, *, force: bool = False, cleanup_orphans: bool = True):
    def delete(self, *, force: bool = False):
        if not self.exists():
            raise FileNotFoundError()
        if self.is_locked():
@@ -212,7 +212,5 @@ class Tag(SubDir):
        if not force and self.is_released():
            raise OSError(f"Tag {self.name} is released, use force")
        for j in self.get_jobs():
            j.delete(cleanup_orphans=False)
            j.delete()
        self.root().rmtree(self._path)
        if cleanup_orphans:
            self.root().cleanup_orphan_blobs()
diff --git a/dlrepo/views/branch.py b/dlrepo/views/branch.py
index 6ff617f7c9b2..e4ccf50df5c1 100644
--- a/dlrepo/views/branch.py
+++ b/dlrepo/views/branch.py
@@ -122,6 +122,7 @@ class BranchView(BaseView):
        try:
            force = "force" in self.request.query
            await loop.run_in_executor(None, lambda: branch.delete(force=force))
            branch.root().cleanup_orphan_blobs()
            return web.Response()
        except OSError as e:
            raise web.HTTPBadRequest(reason=str(e)) from e
diff --git a/dlrepo/views/job.py b/dlrepo/views/job.py
index 9ad32e43b462..c413e0fa06fb 100644
--- a/dlrepo/views/job.py
+++ b/dlrepo/views/job.py
@@ -142,6 +142,7 @@ class JobView(JobArchiveView):
        try:
            job = self._get_job()
            await loop.run_in_executor(None, job.delete)
            job.root().cleanup_orphan_blobs()
        except FileNotFoundError as e:
            raise web.HTTPNotFound() from e
        except OSError as e:
diff --git a/dlrepo/views/tag.py b/dlrepo/views/tag.py
index 2d20bfe6f9a8..dccacc6a0166 100644
--- a/dlrepo/views/tag.py
+++ b/dlrepo/views/tag.py
@@ -102,6 +102,7 @@ class TagView(BaseView):
            tag = self._get_tag()
            force = "force" in self.request.query
            await loop.run_in_executor(None, lambda: tag.delete(force=force))
            tag.root().cleanup_orphan_blobs()
            return web.Response()
        except OSError as e:
            raise web.HTTPBadRequest(reason=str(e)) from e
-- 
2.30.2
dlrepo/patches/.build.yml: SUCCESS in 1m23s

[views: run orphans cleanup in background][0] from [Julien Floret][1]

[0]: https://lists.sr.ht/~rjarry/dlrepo/patches/32638
[1]: mailto:julien.floret@6wind.com

✓ #770803 SUCCESS dlrepo/patches/.build.yml https://builds.sr.ht/~rjarry/job/770803
Le mar. 31 mai 2022 à 10:24, Julien Floret <julien.floret@6wind.com> a écrit :