~lioploum/offpunk-devel

Client certificates and a new certificate cache v1 PROPOSED

Bert Livens: 4
 Migration of ssl certificates of gemlogs. Use the files sytem instead of a database.
 Added support for client-certificates to netcache. When using a url like gemini://username@site.net, the according certificate will be used.
 Added the certs command to list and create client certificates
 Added support for client-certificates to netcache. When using a url like gemini://username@site.net, the according certificate will be used.

 10 files changed, 368 insertions(+), 78 deletions(-)
Hi everyone,

I worked some more on the client certificates and the certificate cache.
- There is now a migration script that gets called. I decided to make it 
a different script as the cache migration instead of making it a version 2.

- Using the `certs` command, you can now list the available certificates 
for a gemini capsule or even create new certificates.

Do take a look at it. I use it every day (at least once, to water my 
plant on gemini://astrobotany.mozz.us ) and it does seem to work but of 
course I have not created a hundred certificates for tens of sites)

I look forward to it being incorporated in offpunk.

Bert Livens
I could send it in three patches (a bit like the commits on my 
fork-repo: one for moving the cache to the file system, one for adding 
the functionality for client certificates and one for the certs 
command). I'll look at the documentation for git send-email, in the 
worst case I just write the emails with the patches myself (long live a 
simple plain text work flow).
Next
Three patches sounds great. I suggest that you send them one at a time 
so we can review them on this list (I really prefer to review patches in 
neomutt ;-) )
Hi Bert,

I’ve tried the patch and everything seems to work fine. Code looks 
great.

I will merge it and push it to trunk to catch bugs in the long term.

Very good work!
This patch doesn’t apply and, weirdly enough, I can’t find what is 
wrong. git am --show-current-patch=diff doesn’t help here: everything 
looks fine. 

But it doesn’t apply.

Could you try to rebase it with current trunk and see what is different?
As the second patch is not applying, I don’t review this one either. 
Could you resend it after we have merged the second patch?
I’ve found the reason for the bug: the code managed to have a cached 
fingerprint without the certificate associated with that fingerprint.

As a rule of thumb, code should never assume a cache. I would like to 
discuss code in netcache.py:516 

This code assume that if there’s a fingerprint, then the corresponding 
certificate is in the cache. Which is not guaranteed, as I’ve 
discovered. It should check if the certificate really exist before 
anything else.


PS: Also, I’m not a fan of the "for… else…" syntax, I find it lacks 
readability. But that’s probably a detail.

-- Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html
I’ve rewritten the code so that :

1. We check through cached certificates do extract the 
most_frequent_cert  and to see if one is matching the current one.

2. If we have no match but one valid most_frequent_cert, we do the 
"throws warning" code.

3. If no certificate directory or no valid cached certificates, we do 
the "First-Use" routine.


I believe this solves the bug and many others that were not found yet 
(such as having an empty cert_cache)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~lioploum/offpunk-devel/patches/53077/mbox | git am -3
Learn more about email & git

[PATCH] Migration of ssl certificates of gemlogs. Use the files sytem instead of a database. Export this patch

Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 CHANGELOG         |  1 +
 cert_migration.py | 48 ++++++++++++++++++++++++++++++
 netcache.py       | 74 +++++++++++++++++++++++------------------------
 offutils.py       | 49 +++++++++++++++++++++++++++----
 4 files changed, 129 insertions(+), 43 deletions(-)
 create mode 100644 cert_migration.py
 mode change 100755 => 100644 netcache.py

diff --git a/CHANGELOG b/CHANGELOG
index 31d54fa..095e40b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -10,6 +10,7 @@
- netcache: fix spartan protocol error
- opnk: fix a crash when caching returns None
- ansicat: remove the treshold argument when launching chafa (strange artifacts with new version)
- netcache: moved the certificate cache to the filesystem instead of a database (by Bert Livens)

## 2.2 - February 13th 2023
- cache folder is now configurable through $OFFPUNK_CACHE_PATH environment variable (by prx)
diff --git a/cert_migration.py b/cert_migration.py
new file mode 100644
index 0000000..11134f1
--- /dev/null
+++ b/cert_migration.py
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2024 Bert Livens
# SPDX-License-Identifier:  AGPL-3.0-only

"""
A script to migrate the offpunk certificate storage to the newest version.

For each new version of offpunk that requires changes to the certificate storage
a migration function should be written, performing a migration from the
immediately previous format.
"""


import sqlite3
import os
import datetime

def upgrade_to_1(data_dir: str, config_dir: str) -> None:
    print("moving from tofu.db to certificates as files")
    db_path = os.path.join(config_dir, "tofu.db")
    db_conn = sqlite3.connect(db_path)
    db_cur = db_conn.cursor()
    db_cur.execute("""
                    SELECT hostname, address, fingerprint, count, first_seen, last_seen
                    FROM cert_cache""")
    certs = db_cur.fetchall()
    data_dir = os.path.join(data_dir, "certs")
    os.makedirs(data_dir, exist_ok=True)
    for hostname, address, fingerprint, count, first_seen, last_seen in certs:
        direc = os.path.join(data_dir, hostname)
        os.makedirs(direc, exist_ok=True)
        certdir = os.path.join(direc, address)
        os.makedirs(certdir, exist_ok=True)
        
        # filename is the fingerprint
        certfile = os.path.join(certdir, str(fingerprint))
        
        # write  count
        with open(certfile, 'w') as file:
            file.write(str(count))
        
        # change creation and modification date of file
        first_seen = datetime.datetime.strptime(first_seen, "%Y-%m-%d %H:%M:%S.%f")
        last_seen = datetime.datetime.strptime(last_seen, "%Y-%m-%d %H:%M:%S.%f")
        os.utime(certfile, (first_seen.timestamp(), last_seen.timestamp()))

    # remove tofu.db
    os.remove(db_path)
diff --git a/netcache.py b/netcache.py
old mode 100755
new mode 100644
index 970de6c..c0a5893
--- a/netcache.py
+++ b/netcache.py
@@ -10,7 +10,6 @@ import ssl
import glob
import datetime
import hashlib
import sqlite3
from ssl import CertificateError
import ansicat
import offutils
@@ -158,7 +157,7 @@ def get_cache_path(url,add_index=True):
    else:
        local = False
        # Convert unicode hostname to punycode using idna RFC3490
        host = parsed.hostname #.encode("idna").decode()
        host = parsed.netloc #.encode("idna").decode()
        port = parsed.port or standard_ports.get(scheme, 0)
        # special gopher selector case
        if scheme == "gopher":
@@ -500,38 +499,36 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
    sha.update(cert)
    fingerprint = sha.hexdigest()

    db_path = os.path.join(xdg("config"), "tofu.db")
    db_conn = sqlite3.connect(db_path)
    db_cur = db_conn.cursor()

    db_cur.execute("""CREATE TABLE IF NOT EXISTS cert_cache
        (hostname text, address text, fingerprint text,
        first_seen date, last_seen date, count integer)""")
    # Have we been here before?
    db_cur.execute("""SELECT fingerprint, first_seen, last_seen, count
        FROM cert_cache
        WHERE hostname=? AND address=?""", (host, address))
    cached_certs = db_cur.fetchall()

    # If so, check for a match
    if cached_certs:
    # The directory of this host and IP-address, e.g.
    # ~/.local/share/offpunk/certs/srht.site/46.23.81.157/
    certdir = os.path.join(xdg("data"), "certs")
    hostdir = os.path.join(certdir, host)
    sitedir = os.path.join(hostdir, address)
    # Have we been here before? (the directory exists)

    if os.path.isdir(sitedir):
        max_count = 0
        most_frequent_cert = None
        for cached_fingerprint, first, last, count in cached_certs:
        files = os.listdir(sitedir)
        count = 0

        for cached_fingerprint in files:
            filepath = os.path.join(sitedir, cached_fingerprint)
            with open(filepath, 'r') as f:
                count = int(f.read())
            if count > max_count:
                max_count = count
                most_frequent_cert = cached_fingerprint
            if fingerprint == cached_fingerprint:
                # Matched!
                db_cur.execute("""UPDATE cert_cache
                    SET last_seen=?, count=?
                    WHERE hostname=? AND address=? AND fingerprint=?""",
                    (now, count+1, host, address, fingerprint))
                db_conn.commit()
                # Increase the counter for this certificate (this also updates
                # the modification time of the file)
                with open(filepath, 'w') as f:
                    f.write(str(count+1))
                break
        else:
            certdir = os.path.join(xdg("config"), "cert_cache")
            with open(os.path.join(certdir, most_frequent_cert+".crt"), "rb") as fp:
            certcache = os.path.join(xdg("config"), "cert_cache")
            with open(os.path.join(certcache, most_frequent_cert+".crt"), "rb") as fp:
                previous_cert = fp.read()
            if _HAS_CRYPTOGRAPHY:
                # Load the most frequently seen certificate to see if it has
@@ -558,25 +555,28 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
            else:
                choice = input("Accept this new certificate? Y/N ").strip().lower()
            if choice in ("y", "yes"):
                db_cur.execute("""INSERT INTO cert_cache
                                VALUES (?, ?, ?, ?, ?, ?)""",
                            (host, address, fingerprint, now, now, 1))
                db_conn.commit()
                with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
                with open(os.path.join(sitedir, fingerprint), "w") as fp:
                    fp.write("1")
                with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
                    fp.write(cert)
            else:
                raise Exception("TOFU Failure!")

    # If not, cache this cert
    else:
        db_cur.execute("""INSERT INTO cert_cache
            VALUES (?, ?, ?, ?, ?, ?)""",
            (host, address, fingerprint, now, now, 1))
        db_conn.commit()
        certdir = os.path.join(xdg("config"), "cert_cache")
        if not os.path.exists(certdir):
        if not os.path.exists(certdir): # XDG_DATA/offpunk/certs
            os.makedirs(certdir)
        with open(os.path.join(certdir, fingerprint+".crt"), "wb") as fp:
        if not os.path.exists(hostdir): # XDG_DATA/offpunk/certs/site.net
            os.makedirs(hostdir)
        if not os.path.exists(sitedir): # XDG_DATA/offpunk/certs/site.net/123.123.123.123
            os.makedirs(sitedir)

        with open(os.path.join(sitedir, fingerprint), "w") as fp:
            fp.write("1")
        certcache = os.path.join(xdg("config"), "cert_cache")
        if not os.path.exists(certcache):
            os.makedirs(certcache)
        with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
            fp.write(cert)

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
diff --git a/offutils.py b/offutils.py
index a266a18..61f9940 100644
--- a/offutils.py
+++ b/offutils.py
@@ -1,6 +1,6 @@
#!/bin/python

#This file contains some utilities common to offpunk, ansirenderer and netcache.
#This file contains some utilities common to offpunk, ansicat and netcache.
#Currently, there are the following utilities:
#
# run : run a shell command and get the results with some security
@@ -15,12 +15,14 @@ import urllib.parse
import urllib.parse
import netcache_migration
import netcache
import cert_migration

CACHE_VERSION = 1
CERT_VERSION = 1

# We upgrade the cache only once at startup, hence the UPGRADED variable
# This is only to avoid unecessary checks each time the cache is accessed
UPGRADED=False
# We upgrade the cache only once at startup, hence the CACHE_UPGRADED variable
# This is only to avoid unnecessary checks each time the cache is accessed
CACHE_UPGRADED=False
def upgrade_cache(cache_folder):
    #Let’s read current version of the cache
    version_path = cache_folder + ".version"
@@ -42,7 +44,40 @@ def upgrade_cache(cache_folder):
        with open(version_path,"w") as f:
            f.write(str(current_version))
            f.close()
    UPGRADED=True
    CACHE_UPGRADED=True

CERT_UPGRADED=False

def upgrade_cert(config_folder: str, data_folder: str) -> None:
    # read the current version
    certdata = os.path.join(data_folder, 'certs')
    if not os.path.exists(certdata):
        os.mkdir(certdata)
    version_path = os.path.join(certdata, ".version")
    current_version = 0
    if os.path.exists(version_path):
        current_str = None
        with open(version_path) as f:
            current_str = f.read()
            f.close()
        try:
            current_version = int(current_str)
        except:
            current_version = 0
    else:
        current_version = 0
    #Now, let’s upgrade the certificate storage if needed
    while current_version < CERT_VERSION:
        current_version += 1
        upgrade_func = getattr(cert_migration,"upgrade_to_"+str(current_version))
        upgrade_func(data_folder, config_folder)
        with open(version_path,"w") as f:
            f.write(str(current_version))
            f.close()
    CERT_UPGRADED=True




#get xdg folder. Folder should be "cache", "data" or "config"
def xdg(folder="cache"):
@@ -72,13 +107,15 @@ def xdg(folder="cache"):
    if not _CACHE_PATH.endswith("/"):
        _CACHE_PATH += "/"
    os.makedirs(_CACHE_PATH,exist_ok=True)
    if folder == "cache" and not UPGRADED:
    if folder == "cache" and not CACHE_UPGRADED:
        upgrade_cache(_CACHE_PATH)
    if folder == "cache":
        return _CACHE_PATH
    elif folder == "config":
        return _CONFIG_DIR
    elif folder == "data":
        if not CERT_UPGRADED:
            upgrade_cert(_CONFIG_DIR, _DATA_DIR)
        return _DATA_DIR
    else:
        print("No XDG folder for %s. Check your code."%folder)
-- 
2.45.1
Hi Bert,

I’ve tried the patch and everything seems to work fine. Code looks 
great.

I will merge it and push it to trunk to catch bugs in the long term.

Very good work!

[PATCH] Added support for client-certificates to netcache. When using a url like gemini://username@site.net, the according certificate will be used. Export this patch

Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 netcache.py | 89 +++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 72 insertions(+), 17 deletions(-)

diff --git a/netcache.py b/netcache.py
index c0a5893..0222d67 100644
--- a/netcache.py
+++ b/netcache.py
@@ -579,12 +579,48 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
        with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
            fp.write(cert)

def _get_client_certkey(site_id: str, host: str):
    # returns {cert: str, key: str}
    certdir = os.path.join(xdg("data"), "certs", host)
    certf = os.path.join(certdir, "%s.cert" % site_id)
    keyf = os.path.join(certdir, "%s.key" % site_id)
    if not os.path.exists(certf) or not os.path.exists(keyf):
        if host != "":
            split = host.split(".")
            #if len(split) > 2:  # Why not allow a global identity? Maybe I want
                                 # to login to all sites with the same
                                 # certificate.
            return _get_client_certkey(site_id, ".".join(split[1:]))
        return None
    certkey = dict(cert=certf, key=keyf)
    return certkey

def _get_site_ids(url: str):
    newurl = normalize_url(url)
    u = urllib.parse.urlparse(newurl)
    if u.scheme == "gemini" and u.username == None:
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)

            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
                    **kwargs):
    cache = None
    newurl = url
    url_parts = urllib.parse.urlparse(url)
    host = url_parts.hostname
    site_id = url_parts.username
    port = url_parts.port or standard_ports["gemini"]
    path = url_parts.path or "/"
    query = url_parts.query
@@ -614,6 +650,14 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    context = ssl.SSLContext(protocol)
    context.check_hostname=False
    context.verify_mode = ssl.CERT_NONE

    # When using an identity, use the certificate and key
    if site_id:
        certkey = _get_client_certkey(site_id, host)
        if certkey:
            context.load_cert_chain(certkey["cert"], certkey["key"])
        else:
            print("This identity doesn't exist for this site (or is disabled).")
    # Impose minimum TLS version
    ## In 3.7 and above, this is easy...
    if sys.version_info.minor >= 7:
@@ -655,15 +699,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    _validate_cert(address[4][0], host, cert,automatic_choice="y")
    # Send request and wrap response in a file descriptor
    url = urllib.parse.urlparse(url)
    new_netloc = host
    new_host = host
    #Handle IPV6 hostname
    if ":" in new_netloc:
        new_netloc = "[" + new_netloc + "]"
    if ":" in new_host:
        new_host = "[" + new_host + "]"
    if port != standard_ports["gemini"]:
        new_netloc += ":" + str(port)
    url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
    s.sendall((url + CRLF).encode("UTF-8"))
    f= s.makefile(mode = "rb")
        new_host += ":" + str(port)
    url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))

    if site_id:
        url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))
    else:
        url = url_no_username

    s.sendall((url_no_username + CRLF).encode("UTF-8"))
    f = s.makefile(mode = "rb")
    ## end of send_request in AV98
    # Spec dictates <META> should not exceed 1024 bytes,
    # so maximum valid header length is 1027 bytes.
@@ -734,8 +784,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
        raise RuntimeError(meta)
    # Client cert
    elif status.startswith("6"):
        error = "Handling certificates for status 6X are not supported by offpunk\n"
        error += "See bug #31 for discussion about the problem"
        error = "You need to provide a client-certificate to access this page."
        raise RuntimeError(error)
    # Invalid status
    elif not status.startswith("2"):
@@ -775,7 +824,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
    newurl = url
    path=None
    print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
    #Firt, we look if we have a valid cache, even if offline
    #First, we look if we have a valid cache, even if offline
    #If we are offline, any cache is better than nothing
    if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
        path = get_cache_path(url)
@@ -793,23 +842,23 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
                path = None
            elif scheme in ("http","https"):
                if _DO_HTTP:
                    path=_fetch_http(url,**kwargs)
                    path=_fetch_http(newurl,**kwargs)
                else:
                    print("HTTP requires python-requests")
            elif scheme == "gopher":
                path=_fetch_gopher(url,**kwargs)
                path=_fetch_gopher(newurl,**kwargs)
            elif scheme == "finger":
                path=_fetch_finger(url,**kwargs)
                path=_fetch_finger(newurl,**kwargs)
            elif scheme == "gemini":
                path,newurl=_fetch_gemini(url,**kwargs)
            elif scheme == "spartan":
                path,newurl=_fetch_spartan(url,**kwargs)
                path,newurl=_fetch_spartan(newurl,**kwargs)
            else:
                print("scheme %s not implemented yet"%scheme)
        except UserAbortException:
            return None, newurl
        except Exception as err:
            cache = set_error(url, err)
            cache = set_error(newurl, err)
            # Print an error message
            # we fail silently when sync_only
            if isinstance(err, socket.gaierror):
@@ -871,12 +920,14 @@ def main():
    
    descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
            By default, netcache will returns a cached version of a given URL, downloading it \
            only if not existing. A validity duration, in seconds, can also be given so that \
            netcache downloads the content only if the existing cache is older than the validity."
            only if a cache version doesn't exist. A validity duration, in seconds, can also \
            be given so netcache downloads the content only if the existing cache is older than the validity."
    # Parse arguments
    parser = argparse.ArgumentParser(prog="netcache",description=descri)
    parser.add_argument("--path", action="store_true",
                        help="return path to the cache instead of the content of the cache")
    parser.add_argument("--ids", action="store_true",
                        help="return a list of id’s for the gemini-site instead of the content of the cache")
    parser.add_argument("--offline", action="store_true",
                        help="Do not attempt to download, return cached version or error")
    parser.add_argument("--max-size", type=int,
@@ -898,11 +949,15 @@ def main():
    for u in args.url:
        if args.offline:
            path = get_cache_path(u)
        elif args.ids:
            ids = _get_site_ids(u)
        else:
            path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
                                validity=args.cache_validity)
        if args.path:
            print(path)
        elif args.ids:
            print(ids)
        else:
            with open(path,"r") as f:
                print(f.read())
-- 
2.45.1
This patch doesn’t apply and, weirdly enough, I can’t find what is 
wrong. git am --show-current-patch=diff doesn’t help here: everything 
looks fine. 

But it doesn’t apply.

Could you try to rebase it with current trunk and see what is different?

[PATCH] Added the certs command to list and create client certificates Export this patch

Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 CHANGELOG   |  1 +
 netcache.py | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 offpunk.py  | 35 +++++++++++++++++++++++++++++++++
 3 files changed, 92 insertions(+)
 mode change 100755 => 100644 offpunk.py

diff --git a/CHANGELOG b/CHANGELOG
index 095e40b..6857ded 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,7 @@
- opnk: fix a crash when caching returns None
- ansicat: remove the treshold argument when launching chafa (strange artifacts with new version)
- netcache: moved the certificate cache to the filesystem instead of a database (by Bert Livens)
- offpunk/netcache: added the "certs" command to list and create client certificates (Bert Livens)

## 2.2 - February 13th 2023
- cache folder is now configurable through $OFFPUNK_CACHE_PATH environment variable (by prx)
diff --git a/netcache.py b/netcache.py
index 0222d67..98079b8 100644
--- a/netcache.py
+++ b/netcache.py
@@ -24,6 +24,9 @@ except ModuleNotFoundError:
try:
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives import serialization
    _HAS_CRYPTOGRAPHY = True
    _BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
@@ -614,6 +617,59 @@ def _get_site_ids(url: str):
    else:
        return []

def create_certificate(name: str, days: int, hostname: str):
    key = rsa.generate_private_key(
            public_exponent  = 65537,
            key_size = 2048)
    sitecertdir = os.path.join(xdg("data"), "certs", hostname)
    keyfile = os.path.join(sitecertdir, name+".key")
    # create the directory of it doesn't exist
    os.makedirs(sitecertdir, exist_ok=True)
    with open(keyfile, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()
        ))
    xname = x509.Name([
        x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
        ])
    # generate the cert, valid a week ago (timekeeping is hard, let's give it a
    # little margin). issuer and subject are your name
    cert = (x509.CertificateBuilder()
            .subject_name(xname)
            .issuer_name(xname)
            .public_key(key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(datetime.datetime.utcnow() -
                              datetime.timedelta(days=7))
            .not_valid_after(datetime.datetime.utcnow() +
                             datetime.timedelta(days=days))
            .sign(key, hashes.SHA256())
            )
    certfile = os.path.join(sitecertdir, name + ".cert")
    with open(certfile, "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))

def get_certs(url: str):
    u = urllib.parse.urlparse(normalize_url(url))
    if u.scheme == "gemini":
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []
        if '@' in netloc_parts[0]:
            netloc_parts[0] = netloc_parts[0].split('@')[1]

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)
            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
                    **kwargs):
    cache = None
diff --git a/offpunk.py b/offpunk.py
old mode 100755
new mode 100644
index 118ca0c..cc59019
--- a/offpunk.py
+++ b/offpunk.py
@@ -89,6 +89,7 @@ _ABBREVS = {
    "bb":   "blackbox",
    "bm":   "bookmarks",
    "book": "bookmarks",
    "cert": "certs",
    "cp":   "copy",
    "f":   "forward",
    "g":    "go",
@@ -815,6 +816,39 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
                except IndexError:
                    print("Invalid index %d, skipping." % n)

    @needs_gi
    def do_certs(self, line) -> None:
        """Manage your client certificates (identities) for a site.
        `certs` will display all valid certificates for the current site
        `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.
        """
        line = line.strip()
        if not line:
            certs = netcache.get_certs(self.current_url)
            if len(certs) == 1:
                print("The one possible certificate for this site is:")
            else:
                print("The", len(certs) ,"possible certificates for this site:")

            print(*certs)
            print("Use the id@site.net notation to activate a certificate.")
        else:
            lineparts = line.split(' ')
            if lineparts[0] == 'new':
                if len(lineparts) == 4:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = lineparts[3]
                    netcache.create_certificate(name, int(days), site)
                elif len(lineparts) == 3:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = urllib.parse.urlparse(self.current_url)
                    netcache.create_certificate(name, int(days), site.hostname)

                else:
                    print("Usage:\ncerts new <name> <days-valid> <url[optional]>\n When no url is specified, the current location will be used.")

    @needs_gi
    def do_mark(self, line):
        """Mark the current item with a single letter.  This letter can then
@@ -904,6 +938,7 @@ Marks are temporary until shutdown (not saved to disk)."""
        output += " - Render Atom/RSS feeds (feedparser)         : " + has(ansicat._DO_FEED)
        output += " - Connect to http/https (requests)           : " + has(netcache._DO_HTTP)
        output += " - Detect text encoding (python-chardet)      : " + has(netcache._HAS_CHARDET)
        output += " - copy to/from clipboard (xsel)              : " + has(_HAS_XSEL)
        output += " - restore last position (less 572+)          : " + has(opnk._LESS_RESTORE_POSITION)
        output += "\n"
        output += "Config directory    : " +  xdg("config") + "\n"
-- 
2.45.1
As the second patch is not applying, I don’t review this one either. 
Could you resend it after we have merged the second patch?

[PATCH] Added support for client-certificates to netcache. When using a url like gemini://username@site.net, the according certificate will be used. Export this patch

Let's see if this works. I replaced a ’ by ' in a the help of the --ids flag.
There is now also a line in the CHANGELOG.
Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 CHANGELOG   |  1 +
 netcache.py | 92 ++++++++++++++++++++++++++++++++++++++++++-----------
 2 files changed, 75 insertions(+), 18 deletions(-)

diff --git a/CHANGELOG b/CHANGELOG
index 095e40b..5841b03 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -11,6 +11,7 @@
- opnk: fix a crash when caching returns None
- ansicat: remove the treshold argument when launching chafa (strange artifacts with new version)
- netcache: moved the certificate cache to the filesystem instead of a database (by Bert Livens)
- netcache/offpunk: Added support for client-certificates

## 2.2 - February 13th 2023
- cache folder is now configurable through $OFFPUNK_CACHE_PATH environment variable (by prx)
diff --git a/netcache.py b/netcache.py
index c0a5893..6d74abb 100644
--- a/netcache.py
+++ b/netcache.py
@@ -579,12 +579,49 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
        with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
            fp.write(cert)


def _get_client_certkey(site_id: str, host: str):
    # returns {cert: str, key: str}
    certdir = os.path.join(xdg("data"), "certs", host)
    certf = os.path.join(certdir, "%s.cert" % site_id)
    keyf = os.path.join(certdir, "%s.key" % site_id)
    if not os.path.exists(certf) or not os.path.exists(keyf):
        if host != "":
            split = host.split(".")
            #if len(split) > 2:  # Why not allow a global identity? Maybe I want
                                 # to login to all sites with the same
                                 # certificate.
            return _get_client_certkey(site_id, ".".join(split[1:]))
        return None
    certkey = dict(cert=certf, key=keyf)
    return certkey

def _get_site_ids(url: str):
    newurl = normalize_url(url)
    u = urllib.parse.urlparse(newurl)
    if u.scheme == "gemini" and u.username == None:
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)

            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
                    **kwargs):
    cache = None
    newurl = url
    url_parts = urllib.parse.urlparse(url)
    host = url_parts.hostname
    site_id = url_parts.username
    port = url_parts.port or standard_ports["gemini"]
    path = url_parts.path or "/"
    query = url_parts.query
@@ -612,8 +649,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    # Prepare TLS context
    protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
    context = ssl.SSLContext(protocol)
    context.check_hostname=False
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    # When using an identity, use the certificate and key
    if site_id:
        certkey = _get_client_certkey(site_id, host)
        if certkey:
            context.load_cert_chain(certkey["cert"], certkey["key"])
        else:
            print("This identity doesn't exist for this site (or is disabled).")
    # Impose minimum TLS version
    ## In 3.7 and above, this is easy...
    if sys.version_info.minor >= 7:
@@ -655,15 +700,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    _validate_cert(address[4][0], host, cert,automatic_choice="y")
    # Send request and wrap response in a file descriptor
    url = urllib.parse.urlparse(url)
    new_netloc = host
    new_host = host
    #Handle IPV6 hostname
    if ":" in new_netloc:
        new_netloc = "[" + new_netloc + "]"
    if ":" in new_host:
        new_host = "[" + new_host + "]"
    if port != standard_ports["gemini"]:
        new_netloc += ":" + str(port)
    url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
    s.sendall((url + CRLF).encode("UTF-8"))
    f= s.makefile(mode = "rb")
        new_host += ":" + str(port)
    url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))

    if site_id:
        url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))
    else:
        url = url_no_username

    s.sendall((url_no_username + CRLF).encode("UTF-8"))
    f = s.makefile(mode = "rb")
    ## end of send_request in AV98
    # Spec dictates <META> should not exceed 1024 bytes,
    # so maximum valid header length is 1027 bytes.
@@ -734,8 +785,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
        raise RuntimeError(meta)
    # Client cert
    elif status.startswith("6"):
        error = "Handling certificates for status 6X are not supported by offpunk\n"
        error += "See bug #31 for discussion about the problem"
        error = "You need to provide a client-certificate to access this page."
        raise RuntimeError(error)
    # Invalid status
    elif not status.startswith("2"):
@@ -775,7 +825,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
    newurl = url
    path=None
    print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
    #Firt, we look if we have a valid cache, even if offline
    #First, we look if we have a valid cache, even if offline
    #If we are offline, any cache is better than nothing
    if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
        path = get_cache_path(url)
@@ -793,23 +843,23 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
                path = None
            elif scheme in ("http","https"):
                if _DO_HTTP:
                    path=_fetch_http(url,**kwargs)
                    path=_fetch_http(newurl,**kwargs)
                else:
                    print("HTTP requires python-requests")
            elif scheme == "gopher":
                path=_fetch_gopher(url,**kwargs)
                path=_fetch_gopher(newurl,**kwargs)
            elif scheme == "finger":
                path=_fetch_finger(url,**kwargs)
                path=_fetch_finger(newurl,**kwargs)
            elif scheme == "gemini":
                path,newurl=_fetch_gemini(url,**kwargs)
            elif scheme == "spartan":
                path,newurl=_fetch_spartan(url,**kwargs)
            else:
                print("scheme %s not implemented yet"%scheme)
                print("scheme %s not implemented yet")
        except UserAbortException:
            return None, newurl
        except Exception as err:
            cache = set_error(url, err)
            cache = set_error(newurl, err)
            # Print an error message
            # we fail silently when sync_only
            if isinstance(err, socket.gaierror):
@@ -871,12 +921,14 @@ def main():
    
    descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
            By default, netcache will returns a cached version of a given URL, downloading it \
            only if not existing. A validity duration, in seconds, can also be given so that \
            netcache downloads the content only if the existing cache is older than the validity."
            only if a cache version doesn't exist. A validity duration, in seconds, can also \
            be given so netcache downloads the content only if the existing cache is older than the validity."
    # Parse arguments
    parser = argparse.ArgumentParser(prog="netcache",description=descri)
    parser.add_argument("--path", action="store_true",
                        help="return path to the cache instead of the content of the cache")
    parser.add_argument("--ids", action="store_true",
                        help="return a list of id's for the gemini-site instead of the content of the cache")
    parser.add_argument("--offline", action="store_true",
                        help="Do not attempt to download, return cached version or error")
    parser.add_argument("--max-size", type=int,
@@ -898,11 +950,15 @@ def main():
    for u in args.url:
        if args.offline:
            path = get_cache_path(u)
        elif args.ids:
            ids = _get_site_ids(u)
        else:
            path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
                                validity=args.cache_validity)
        if args.path:
            print(path)
        elif args.ids:
            print(ids)
        else:
            with open(path,"r") as f:
                print(f.read())
-- 
2.45.1
I’ve found the reason for the bug: the code managed to have a cached 
fingerprint without the certificate associated with that fingerprint.

As a rule of thumb, code should never assume a cache. I would like to 
discuss code in netcache.py:516 

This code assume that if there’s a fingerprint, then the corresponding 
certificate is in the cache. Which is not guaranteed, as I’ve 
discovered. It should check if the certificate really exist before 
anything else.


PS: Also, I’m not a fan of the "for… else…" syntax, I find it lacks 
readability. But that’s probably a detail.

-- Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html
I’ve rewritten the code so that :

1. We check through cached certificates do extract the 
most_frequent_cert  and to see if one is matching the current one.

2. If we have no match but one valid most_frequent_cert, we do the 
"throws warning" code.

3. If no certificate directory or no valid cached certificates, we do 
the "First-Use" routine.


I believe this solves the bug and many others that were not found yet 
(such as having an empty cert_cache)