~lioploum/offpunk-devel

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch
4 2

[PATCH] Added support for client-certificates to netcache and the certs command to manage them.

Details
Message ID
<20240807000618.88755-2-bert@bertlivens.be>
DKIM signature
pass
Download raw message
Patch: +167 -17
Hi everyone,
I've been away for a while (exams, vacation) but I'm back.
Once again, this is a patch that tries to add support for client certificates to offpunk and provides the command to use them. It is based on the most recent version of Offpunk.

More information about what this tries to do and how it works can be found in this previous thread: https://lists.sr.ht/~lioploum/offpunk-devel/<bc5af397-9386-450d-bf2a-45f5cceccedd@bertlivens.be>
Don't hesitate to ask any questions you have.

~~ Bert Livens


Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 CHANGELOG   |   2 +
 netcache.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++------
 offpunk.py  |  37 ++++++++++++++
 3 files changed, 167 insertions(+), 17 deletions(-)
 mode change 100755 => 100644 netcache.py

diff --git a/CHANGELOG b/CHANGELOG
index e24ef8e..1ee9dfd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -3,6 +3,8 @@
## 2.4 - Unreleased
- Deprecation warning if using Chafa < 1.10
- "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)
- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)
- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)

## 2.3 - June 29th 2024
- Wayland clipboard support through wl-clipboard (new suggested dependency)
diff --git a/netcache.py b/netcache.py
old mode 100755
new mode 100644
index 38f001e..d775472
--- a/netcache.py
+++ b/netcache.py
@@ -24,6 +24,9 @@ except ModuleNotFoundError:
try:
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives import serialization
    _HAS_CRYPTOGRAPHY = True
    _BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
        with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
            fp.write(cert)

def _get_client_certkey(site_id: str, host: str):
    # returns {cert: str, key: str}
    certdir = os.path.join(xdg("data"), "certs", host)
    certf = os.path.join(certdir, "%s.cert" % site_id)
    keyf = os.path.join(certdir, "%s.key" % site_id)
    if not os.path.exists(certf) or not os.path.exists(keyf):
        if host != "":
            split = host.split(".")
            #if len(split) > 2:  # Why not allow a global identity? Maybe I want
                                 # to login to all sites with the same
                                 # certificate.
            return _get_client_certkey(site_id, ".".join(split[1:]))
        return None
    certkey = dict(cert=certf, key=keyf)
    return certkey

def _get_site_ids(url: str):
    newurl = normalize_url(url)
    u = urllib.parse.urlparse(newurl)
    if u.scheme == "gemini" and u.username == None:
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)

            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def create_certificate(name: str, days: int, hostname: str):
    key = rsa.generate_private_key(
            public_exponent  = 65537,
            key_size = 2048)
    sitecertdir = os.path.join(xdg("data"), "certs", hostname)
    keyfile = os.path.join(sitecertdir, name+".key")
    # create the directory of it doesn't exist
    os.makedirs(sitecertdir, exist_ok=True)
    with open(keyfile, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()
        ))
    xname = x509.Name([
        x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
        ])
    # generate the cert, valid a week ago (timekeeping is hard, let's give it a
    # little margin). issuer and subject are your name
    cert = (x509.CertificateBuilder()
            .subject_name(xname)
            .issuer_name(xname)
            .public_key(key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(datetime.datetime.utcnow() -
                              datetime.timedelta(days=7))
            .not_valid_after(datetime.datetime.utcnow() +
                             datetime.timedelta(days=days))
            .sign(key, hashes.SHA256())
            )
    certfile = os.path.join(sitecertdir, name + ".cert")
    with open(certfile, "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))

def get_certs(url: str):
    u = urllib.parse.urlparse(normalize_url(url))
    if u.scheme == "gemini":
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []
        if '@' in netloc_parts[0]:
            netloc_parts[0] = netloc_parts[0].split('@')[1]

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)
            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
                    **kwargs):
    cache = None
    newurl = url
    url_parts = urllib.parse.urlparse(url)
    host = url_parts.hostname
    site_id = url_parts.username
    port = url_parts.port or standard_ports["gemini"]
    path = url_parts.path or "/"
    query = url_parts.query
@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    # Prepare TLS context
    protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
    context = ssl.SSLContext(protocol)
    context.check_hostname=False
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    # When using an identity, use the certificate and key
    if site_id:
        certkey = _get_client_certkey(site_id, host)
        if certkey:
            context.load_cert_chain(certkey["cert"], certkey["key"])
        else:
            print("This identity doesn't exist for this site (or is disabled).")
    # Impose minimum TLS version
    ## In 3.7 and above, this is easy...
    if sys.version_info.minor >= 7:
@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    _validate_cert(address[4][0], host, cert,automatic_choice="y")
    # Send request and wrap response in a file descriptor
    url = urllib.parse.urlparse(url)
    new_netloc = host
    new_host = host
    #Handle IPV6 hostname
    if ":" in new_netloc:
        new_netloc = "[" + new_netloc + "]"
    if ":" in new_host:
        new_host = "[" + new_host + "]"
    if port != standard_ports["gemini"]:
        new_netloc += ":" + str(port)
    url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
    s.sendall((url + CRLF).encode("UTF-8"))
    f= s.makefile(mode = "rb")
        new_host += ":" + str(port)
    url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))

    if site_id:
        url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))
    else:
        url = url_no_username

    s.sendall((url_no_username + CRLF).encode("UTF-8"))
    f = s.makefile(mode = "rb")
    ## end of send_request in AV98
    # Spec dictates <META> should not exceed 1024 bytes,
    # so maximum valid header length is 1027 bytes.
@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
        raise RuntimeError(meta)
    # Client cert
    elif status.startswith("6"):
        error = "Handling certificates for status 6X are not supported by offpunk\n"
        error += "See bug #31 for discussion about the problem"
        error = "You need to provide a client-certificate to access this page."
        raise RuntimeError(error)
    # Invalid status
    elif not status.startswith("2"):
@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
    newurl = url
    path=None
    print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
    #Firt, we look if we have a valid cache, even if offline
    #First, we look if we have a valid cache, even if offline
    #If we are offline, any cache is better than nothing
    if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
        path = get_cache_path(url)
@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
                path = None
            elif scheme in ("http","https"):
                if _DO_HTTP:
                    path=_fetch_http(url,**kwargs)
                    path=_fetch_http(newurl,**kwargs)
                else:
                    print("HTTP requires python-requests")
            elif scheme == "gopher":
                path=_fetch_gopher(url,**kwargs)
                path=_fetch_gopher(newurl,**kwargs)
            elif scheme == "finger":
                path=_fetch_finger(url,**kwargs)
                path=_fetch_finger(newurl,**kwargs)
            elif scheme == "gemini":
                path,newurl=_fetch_gemini(url,**kwargs)
            elif scheme == "spartan":
@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
        except UserAbortException:
            return None, newurl
        except Exception as err:
            cache = set_error(url, err)
            cache = set_error(newurl, err)
            # Print an error message
            # we fail silently when sync_only
            if isinstance(err, socket.gaierror):
@@ -881,12 +986,14 @@ def main():
    
    descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
            By default, netcache will returns a cached version of a given URL, downloading it \
            only if not existing. A validity duration, in seconds, can also be given so that \
            netcache downloads the content only if the existing cache is older than the validity."
            only if a cache version doesn't exist. A validity duration, in seconds, can also \
            be given so netcache downloads the content only if the existing cache is older than the validity."
    # Parse arguments
    parser = argparse.ArgumentParser(prog="netcache",description=descri)
    parser.add_argument("--path", action="store_true",
                        help="return path to the cache instead of the content of the cache")
    parser.add_argument("--ids", action="store_true",
                        help="return a list of id’s for the gemini-site instead of the content of the cache")
    parser.add_argument("--offline", action="store_true",
                        help="Do not attempt to download, return cached version or error")
    parser.add_argument("--max-size", type=int,
@@ -908,11 +1015,15 @@ def main():
    for u in args.url:
        if args.offline:
            path = get_cache_path(u)
        elif args.ids:
            ids = _get_site_ids(u)
        else:
            path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
                                validity=args.cache_validity)
        if args.path:
            print(path)
        elif args.ids:
            print(ids)
        else:
            with open(path,"r") as f:
                print(f.read())
diff --git a/offpunk.py b/offpunk.py
index 8235566..9f9af1f 100755
--- a/offpunk.py
+++ b/offpunk.py
@@ -89,6 +89,7 @@ _ABBREVS = {
    "bb":   "blackbox",
    "bm":   "bookmarks",
    "book": "bookmarks",
    "cert": "certs",
    "cp":   "copy",
    "f":   "forward",
    "g":    "go",
@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
                except IndexError:
                    print("Invalid index %d, skipping." % n)

    @needs_gi
    def do_certs(self, line) -> None:
        """Manage your client certificates (identities) for a site.
        `certs` will display all valid certificates for the current site
        `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.
        """
        line = line.strip()
        if not line:
            certs = netcache.get_certs(self.current_url)
            if len(certs) == 0:
                print("There are no certificates available for this site.")
            else:
                if len(certs) == 1:
                    print("The one available certificate for this site is:")
                else:
                    print("The", len(certs) ,"available certificates for this site are:")

                print(*certs)
                print("Use the 'id@site.net' notation to activate a certificate.")
        else:
            lineparts = line.split(' ')
            if lineparts[0] == 'new':
                if len(lineparts) == 4:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = lineparts[3]
                    netcache.create_certificate(name, int(days), site)
                elif len(lineparts) == 3:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = urllib.parse.urlparse(self.current_url)
                    netcache.create_certificate(name, int(days), site.hostname)

                else:
                    print("usage")

    @needs_gi
    def do_mark(self, line):
        """Mark the current item with a single letter.  This letter can then
-- 
2.46.0
Details
Message ID
<172302467198.6.5165820440470505680.399544226@ploum.eu>
In-Reply-To
<20240807000618.88755-2-bert@bertlivens.be> (view parent)
DKIM signature
pass
Download raw message
On 24 aoû 07 02:06, Bert Livens wrote:
>Hi everyone,
>I've been away for a while (exams, vacation) but I'm back.
>Once again, this is a patch that tries to add support for client certificates to offpunk and provides the command to use them. It is based on the most recent version of Offpunk.
>
>More information about what this tries to do and how it works can be found in this previous thread: https://lists.sr.ht/~lioploum/offpunk-devel/<bc5af397-9386-450d-bf2a-45f5cceccedd@bertlivens.be>
>Don't hesitate to ask any questions you have.


Hi Bert,

Very happy to see you back. I still don’t understand why this patch 
doesn’t apply. Everything seems fine. The rejection appears to be in 
hunk #9 but I don’t see any problem.

I copy/paste .rej file. If you can’t spot what’s wrong, maybe could you 
resend your patch without this part so the bulk of the work can be 
applied cleanly. 

You could then submit a small patch to add the "--ids" parameter (or I 
could add it myself manually).


If anybody reading this has any way to see why the patch is failing, I 
would be very curious. It seems that I’ve hit the ceiling of my weak git 
knowledege here.


The .rej file:


diff a/netcache.py b/netcache.py	(rejected hunks)
@@ -881,12 +986,14 @@ def main():

      descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
              By default, netcache will returns a cached version of a given URL, downloading it \
-            only if not existing. A validity duration, in seconds, can also be given so that \
-            netcache downloads the content only if the existing cache is older than the validity."
+            only if a cache version doesn't exist. A validity duration, in seconds, can also \
+            be given so netcache downloads the content only if the existing cache is older than the validity."
      # Parse arguments
      parser = argparse.ArgumentParser(prog="netcache",description=descri)
      parser.add_argument("--path", action="store_true",
                          help="return path to the cache instead of the content of the cache")
+    parser.add_argument("--ids", action="store_true",
+                        help="return a list of id’s for the gemini-site instead of the content of the cache")
      parser.add_argument("--offline", action="store_true",
                          help="Do not attempt to download, return cached version or error")
      parser.add_argument("--max-size", type=int,

>
>~~ Bert Livens
>
>
>Signed-off-by: Bert Livens <bert@bertlivens.be>
>---
> CHANGELOG   |   2 +
> netcache.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++------
> offpunk.py  |  37 ++++++++++++++
> 3 files changed, 167 insertions(+), 17 deletions(-)
> mode change 100755 => 100644 netcache.py
>
>diff --git a/CHANGELOG b/CHANGELOG
>index e24ef8e..1ee9dfd 100644
>--- a/CHANGELOG
>+++ b/CHANGELOG
>@@ -3,6 +3,8 @@
> ## 2.4 - Unreleased
> - Deprecation warning if using Chafa < 1.10
> - "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)
>+- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)
>+- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)
>
> ## 2.3 - June 29th 2024
> - Wayland clipboard support through wl-clipboard (new suggested dependency)
>diff --git a/netcache.py b/netcache.py
>old mode 100755
>new mode 100644
>index 38f001e..d775472
>--- a/netcache.py
>+++ b/netcache.py
>@@ -24,6 +24,9 @@ except ModuleNotFoundError:
> try:
>     from cryptography import x509
>     from cryptography.hazmat.backends import default_backend
>+    from cryptography.hazmat.primitives import hashes
>+    from cryptography.hazmat.primitives.asymmetric import rsa
>+    from cryptography.hazmat.primitives import serialization
>     _HAS_CRYPTOGRAPHY = True
>     _BACKEND = default_backend()
> except(ModuleNotFoundError,ImportError):
>@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
>         with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
>             fp.write(cert)
>
>+def _get_client_certkey(site_id: str, host: str):
>+    # returns {cert: str, key: str}
>+    certdir = os.path.join(xdg("data"), "certs", host)
>+    certf = os.path.join(certdir, "%s.cert" % site_id)
>+    keyf = os.path.join(certdir, "%s.key" % site_id)
>+    if not os.path.exists(certf) or not os.path.exists(keyf):
>+        if host != "":
>+            split = host.split(".")
>+            #if len(split) > 2:  # Why not allow a global identity? Maybe I want
>+                                 # to login to all sites with the same
>+                                 # certificate.
>+            return _get_client_certkey(site_id, ".".join(split[1:]))
>+        return None
>+    certkey = dict(cert=certf, key=keyf)
>+    return certkey
>+
>+def _get_site_ids(url: str):
>+    newurl = normalize_url(url)
>+    u = urllib.parse.urlparse(newurl)
>+    if u.scheme == "gemini" and u.username == None:
>+        certdir = os.path.join(xdg("data"), "certs")
>+        netloc_parts = u.netloc.split(".")
>+        site_ids = []
>+
>+        for i in range(len(netloc_parts), 0, -1):
>+            lasti = ".".join(netloc_parts[-i:])
>+            direc = os.path.join(certdir, lasti)
>+
>+            for certfile in glob.glob(os.path.join(direc, "*.cert")):
>+                site_id = certfile.split('/')[-1].split(".")[-2]
>+                site_ids.append(site_id)
>+        return site_ids
>+    else:
>+        return []
>+
>+def create_certificate(name: str, days: int, hostname: str):
>+    key = rsa.generate_private_key(
>+            public_exponent  = 65537,
>+            key_size = 2048)
>+    sitecertdir = os.path.join(xdg("data"), "certs", hostname)
>+    keyfile = os.path.join(sitecertdir, name+".key")
>+    # create the directory of it doesn't exist
>+    os.makedirs(sitecertdir, exist_ok=True)
>+    with open(keyfile, "wb") as f:
>+        f.write(key.private_bytes(
>+            encoding=serialization.Encoding.PEM,
>+            format=serialization.PrivateFormat.TraditionalOpenSSL,
>+            encryption_algorithm=serialization.NoEncryption()
>+        ))
>+    xname = x509.Name([
>+        x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
>+        ])
>+    # generate the cert, valid a week ago (timekeeping is hard, let's give it a
>+    # little margin). issuer and subject are your name
>+    cert = (x509.CertificateBuilder()
>+            .subject_name(xname)
>+            .issuer_name(xname)
>+            .public_key(key.public_key())
>+            .serial_number(x509.random_serial_number())
>+            .not_valid_before(datetime.datetime.utcnow() -
>+                              datetime.timedelta(days=7))
>+            .not_valid_after(datetime.datetime.utcnow() +
>+                             datetime.timedelta(days=days))
>+            .sign(key, hashes.SHA256())
>+            )
>+    certfile = os.path.join(sitecertdir, name + ".cert")
>+    with open(certfile, "wb") as f:
>+        f.write(cert.public_bytes(serialization.Encoding.PEM))
>+
>+def get_certs(url: str):
>+    u = urllib.parse.urlparse(normalize_url(url))
>+    if u.scheme == "gemini":
>+        certdir = os.path.join(xdg("data"), "certs")
>+        netloc_parts = u.netloc.split(".")
>+        site_ids = []
>+        if '@' in netloc_parts[0]:
>+            netloc_parts[0] = netloc_parts[0].split('@')[1]
>+
>+        for i in range(len(netloc_parts), 0, -1):
>+            lasti = ".".join(netloc_parts[-i:])
>+            direc = os.path.join(certdir, lasti)
>+            for certfile in glob.glob(os.path.join(direc, "*.cert")):
>+                site_id = certfile.split('/')[-1].split(".")[-2]
>+                site_ids.append(site_id)
>+        return site_ids
>+    else:
>+        return []
>+
> def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
>                     **kwargs):
>     cache = None
>     newurl = url
>     url_parts = urllib.parse.urlparse(url)
>     host = url_parts.hostname
>+    site_id = url_parts.username
>     port = url_parts.port or standard_ports["gemini"]
>     path = url_parts.path or "/"
>     query = url_parts.query
>@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
>     # Prepare TLS context
>     protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
>     context = ssl.SSLContext(protocol)
>-    context.check_hostname=False
>+    context.check_hostname = False
>     context.verify_mode = ssl.CERT_NONE
>+
>+    # When using an identity, use the certificate and key
>+    if site_id:
>+        certkey = _get_client_certkey(site_id, host)
>+        if certkey:
>+            context.load_cert_chain(certkey["cert"], certkey["key"])
>+        else:
>+            print("This identity doesn't exist for this site (or is disabled).")
>     # Impose minimum TLS version
>     ## In 3.7 and above, this is easy...
>     if sys.version_info.minor >= 7:
>@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
>     _validate_cert(address[4][0], host, cert,automatic_choice="y")
>     # Send request and wrap response in a file descriptor
>     url = urllib.parse.urlparse(url)
>-    new_netloc = host
>+    new_host = host
>     #Handle IPV6 hostname
>-    if ":" in new_netloc:
>-        new_netloc = "[" + new_netloc + "]"
>+    if ":" in new_host:
>+        new_host = "[" + new_host + "]"
>     if port != standard_ports["gemini"]:
>-        new_netloc += ":" + str(port)
>-    url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
>-    s.sendall((url + CRLF).encode("UTF-8"))
>-    f= s.makefile(mode = "rb")
>+        new_host += ":" + str(port)
>+    url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))
>+
>+    if site_id:
>+        url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))
>+    else:
>+        url = url_no_username
>+
>+    s.sendall((url_no_username + CRLF).encode("UTF-8"))
>+    f = s.makefile(mode = "rb")
>     ## end of send_request in AV98
>     # Spec dictates <META> should not exceed 1024 bytes,
>     # so maximum valid header length is 1027 bytes.
>@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
>         raise RuntimeError(meta)
>     # Client cert
>     elif status.startswith("6"):
>-        error = "Handling certificates for status 6X are not supported by offpunk\n"
>-        error += "See bug #31 for discussion about the problem"
>+        error = "You need to provide a client-certificate to access this page."
>         raise RuntimeError(error)
>     # Invalid status
>     elif not status.startswith("2"):
>@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
>     newurl = url
>     path=None
>     print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
>-    #Firt, we look if we have a valid cache, even if offline
>+    #First, we look if we have a valid cache, even if offline
>     #If we are offline, any cache is better than nothing
>     if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
>         path = get_cache_path(url)
>@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
>                 path = None
>             elif scheme in ("http","https"):
>                 if _DO_HTTP:
>-                    path=_fetch_http(url,**kwargs)
>+                    path=_fetch_http(newurl,**kwargs)
>                 else:
>                     print("HTTP requires python-requests")
>             elif scheme == "gopher":
>-                path=_fetch_gopher(url,**kwargs)
>+                path=_fetch_gopher(newurl,**kwargs)
>             elif scheme == "finger":
>-                path=_fetch_finger(url,**kwargs)
>+                path=_fetch_finger(newurl,**kwargs)
>             elif scheme == "gemini":
>                 path,newurl=_fetch_gemini(url,**kwargs)
>             elif scheme == "spartan":
>@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
>         except UserAbortException:
>             return None, newurl
>         except Exception as err:
>-            cache = set_error(url, err)
>+            cache = set_error(newurl, err)
>             # Print an error message
>             # we fail silently when sync_only
>             if isinstance(err, socket.gaierror):
>@@ -881,12 +986,14 @@ def main():
>
>     descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
>             By default, netcache will returns a cached version of a given URL, downloading it \
>-            only if not existing. A validity duration, in seconds, can also be given so that \
>-            netcache downloads the content only if the existing cache is older than the validity."
>+            only if a cache version doesn't exist. A validity duration, in seconds, can also \
>+            be given so netcache downloads the content only if the existing cache is older than the validity."
>     # Parse arguments
>     parser = argparse.ArgumentParser(prog="netcache",description=descri)
>     parser.add_argument("--path", action="store_true",
>                         help="return path to the cache instead of the content of the cache")
>+    parser.add_argument("--ids", action="store_true",
>+                        help="return a list of id’s for the gemini-site instead of the content of the cache")
>     parser.add_argument("--offline", action="store_true",
>                         help="Do not attempt to download, return cached version or error")
>     parser.add_argument("--max-size", type=int,
>@@ -908,11 +1015,15 @@ def main():
>     for u in args.url:
>         if args.offline:
>             path = get_cache_path(u)
>+        elif args.ids:
>+            ids = _get_site_ids(u)
>         else:
>             path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
>                                 validity=args.cache_validity)
>         if args.path:
>             print(path)
>+        elif args.ids:
>+            print(ids)
>         else:
>             with open(path,"r") as f:
>                 print(f.read())
>diff --git a/offpunk.py b/offpunk.py
>index 8235566..9f9af1f 100755
>--- a/offpunk.py
>+++ b/offpunk.py
>@@ -89,6 +89,7 @@ _ABBREVS = {
>     "bb":   "blackbox",
>     "bm":   "bookmarks",
>     "book": "bookmarks",
>+    "cert": "certs",
>     "cp":   "copy",
>     "f":   "forward",
>     "g":    "go",
>@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
>                 except IndexError:
>                     print("Invalid index %d, skipping." % n)
>
>+    @needs_gi
>+    def do_certs(self, line) -> None:
>+        """Manage your client certificates (identities) for a site.
>+        `certs` will display all valid certificates for the current site
>+        `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.
>+        """
>+        line = line.strip()
>+        if not line:
>+            certs = netcache.get_certs(self.current_url)
>+            if len(certs) == 0:
>+                print("There are no certificates available for this site.")
>+            else:
>+                if len(certs) == 1:
>+                    print("The one available certificate for this site is:")
>+                else:
>+                    print("The", len(certs) ,"available certificates for this site are:")
>+
>+                print(*certs)
>+                print("Use the 'id@site.net' notation to activate a certificate.")
>+        else:
>+            lineparts = line.split(' ')
>+            if lineparts[0] == 'new':
>+                if len(lineparts) == 4:
>+                    name = lineparts[1]
>+                    days = lineparts[2]
>+                    site = lineparts[3]
>+                    netcache.create_certificate(name, int(days), site)
>+                elif len(lineparts) == 3:
>+                    name = lineparts[1]
>+                    days = lineparts[2]
>+                    site = urllib.parse.urlparse(self.current_url)
>+                    netcache.create_certificate(name, int(days), site.hostname)
>+
>+                else:
>+                    print("usage")
>+
>     @needs_gi
>     def do_mark(self, line):
>         """Mark the current item with a single letter.  This letter can then
>--
>2.46.0
>
>

-- 
Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html

[PATCH v2] Added support for client-certificates to netcache and the certs command to manage them.

Details
Message ID
<20240807230158.88593-1-bert@bertlivens.be>
In-Reply-To
<172302467198.6.5165820440470505680.399544226@ploum.eu> (view parent)
DKIM signature
pass
Download raw message
Patch: +163 -16
This is part one, it doesn't define the flags
 for netcache.py yet so the command won't work.

Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 CHANGELOG   |   2 +
 netcache.py | 140 ++++++++++++++++++++++++++++++++++++++++++++++------
 offpunk.py  |  37 ++++++++++++++
 3 files changed, 163 insertions(+), 16 deletions(-)
 mode change 100755 => 100644 netcache.py

diff --git a/CHANGELOG b/CHANGELOG
index e24ef8e..1ee9dfd 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -3,6 +3,8 @@
## 2.4 - Unreleased
- Deprecation warning if using Chafa < 1.10
- "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)
- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)
- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)

## 2.3 - June 29th 2024
- Wayland clipboard support through wl-clipboard (new suggested dependency)
diff --git a/netcache.py b/netcache.py
old mode 100755
new mode 100644
index 38f001e..7640f69
--- a/netcache.py
+++ b/netcache.py
@@ -24,6 +24,9 @@ except ModuleNotFoundError:
try:
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import hashes
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives import serialization
    _HAS_CRYPTOGRAPHY = True
    _BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
        with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
            fp.write(cert)

def _get_client_certkey(site_id: str, host: str):
    # returns {cert: str, key: str}
    certdir = os.path.join(xdg("data"), "certs", host)
    certf = os.path.join(certdir, "%s.cert" % site_id)
    keyf = os.path.join(certdir, "%s.key" % site_id)
    if not os.path.exists(certf) or not os.path.exists(keyf):
        if host != "":
            split = host.split(".")
            #if len(split) > 2:  # Why not allow a global identity? Maybe I want
                                 # to login to all sites with the same
                                 # certificate.
            return _get_client_certkey(site_id, ".".join(split[1:]))
        return None
    certkey = dict(cert=certf, key=keyf)
    return certkey

def _get_site_ids(url: str):
    newurl = normalize_url(url)
    u = urllib.parse.urlparse(newurl)
    if u.scheme == "gemini" and u.username == None:
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)

            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def create_certificate(name: str, days: int, hostname: str):
    key = rsa.generate_private_key(
            public_exponent  = 65537,
            key_size = 2048)
    sitecertdir = os.path.join(xdg("data"), "certs", hostname)
    keyfile = os.path.join(sitecertdir, name+".key")
    # create the directory of it doesn't exist
    os.makedirs(sitecertdir, exist_ok=True)
    with open(keyfile, "wb") as f:
        f.write(key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption()
        ))
    xname = x509.Name([
        x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
        ])
    # generate the cert, valid a week ago (timekeeping is hard, let's give it a
    # little margin). issuer and subject are your name
    cert = (x509.CertificateBuilder()
            .subject_name(xname)
            .issuer_name(xname)
            .public_key(key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(datetime.datetime.utcnow() -
                              datetime.timedelta(days=7))
            .not_valid_after(datetime.datetime.utcnow() +
                             datetime.timedelta(days=days))
            .sign(key, hashes.SHA256())
            )
    certfile = os.path.join(sitecertdir, name + ".cert")
    with open(certfile, "wb") as f:
        f.write(cert.public_bytes(serialization.Encoding.PEM))

def get_certs(url: str):
    u = urllib.parse.urlparse(normalize_url(url))
    if u.scheme == "gemini":
        certdir = os.path.join(xdg("data"), "certs")
        netloc_parts = u.netloc.split(".")
        site_ids = []
        if '@' in netloc_parts[0]:
            netloc_parts[0] = netloc_parts[0].split('@')[1]

        for i in range(len(netloc_parts), 0, -1):
            lasti = ".".join(netloc_parts[-i:])
            direc = os.path.join(certdir, lasti)
            for certfile in glob.glob(os.path.join(direc, "*.cert")):
                site_id = certfile.split('/')[-1].split(".")[-2]
                site_ids.append(site_id)
        return site_ids
    else:
        return []

def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
                    **kwargs):
    cache = None
    newurl = url
    url_parts = urllib.parse.urlparse(url)
    host = url_parts.hostname
    site_id = url_parts.username
    port = url_parts.port or standard_ports["gemini"]
    path = url_parts.path or "/"
    query = url_parts.query
@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    # Prepare TLS context
    protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
    context = ssl.SSLContext(protocol)
    context.check_hostname=False
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    # When using an identity, use the certificate and key
    if site_id:
        certkey = _get_client_certkey(site_id, host)
        if certkey:
            context.load_cert_chain(certkey["cert"], certkey["key"])
        else:
            print("This identity doesn't exist for this site (or is disabled).")
    # Impose minimum TLS version
    ## In 3.7 and above, this is easy...
    if sys.version_info.minor >= 7:
@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
    _validate_cert(address[4][0], host, cert,automatic_choice="y")
    # Send request and wrap response in a file descriptor
    url = urllib.parse.urlparse(url)
    new_netloc = host
    new_host = host
    #Handle IPV6 hostname
    if ":" in new_netloc:
        new_netloc = "[" + new_netloc + "]"
    if ":" in new_host:
        new_host = "[" + new_host + "]"
    if port != standard_ports["gemini"]:
        new_netloc += ":" + str(port)
    url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))
    s.sendall((url + CRLF).encode("UTF-8"))
    f= s.makefile(mode = "rb")
        new_host += ":" + str(port)
    url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))

    if site_id:
        url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))
    else:
        url = url_no_username

    s.sendall((url_no_username + CRLF).encode("UTF-8"))
    f = s.makefile(mode = "rb")
    ## end of send_request in AV98
    # Spec dictates <META> should not exceed 1024 bytes,
    # so maximum valid header length is 1027 bytes.
@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
        raise RuntimeError(meta)
    # Client cert
    elif status.startswith("6"):
        error = "Handling certificates for status 6X are not supported by offpunk\n"
        error += "See bug #31 for discussion about the problem"
        error = "You need to provide a client-certificate to access this page."
        raise RuntimeError(error)
    # Invalid status
    elif not status.startswith("2"):
@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
    newurl = url
    path=None
    print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
    #Firt, we look if we have a valid cache, even if offline
    #First, we look if we have a valid cache, even if offline
    #If we are offline, any cache is better than nothing
    if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
        path = get_cache_path(url)
@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
                path = None
            elif scheme in ("http","https"):
                if _DO_HTTP:
                    path=_fetch_http(url,**kwargs)
                    path=_fetch_http(newurl,**kwargs)
                else:
                    print("HTTP requires python-requests")
            elif scheme == "gopher":
                path=_fetch_gopher(url,**kwargs)
                path=_fetch_gopher(newurl,**kwargs)
            elif scheme == "finger":
                path=_fetch_finger(url,**kwargs)
                path=_fetch_finger(newurl,**kwargs)
            elif scheme == "gemini":
                path,newurl=_fetch_gemini(url,**kwargs)
            elif scheme == "spartan":
@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
        except UserAbortException:
            return None, newurl
        except Exception as err:
            cache = set_error(url, err)
            cache = set_error(newurl, err)
            # Print an error message
            # we fail silently when sync_only
            if isinstance(err, socket.gaierror):
@@ -902,17 +1007,20 @@ def main():
    # --validity : returns the date of the cached version, Null if no version
    # --force-download : download and replace cache, even if valid
    args = parser.parse_args()

    param = {}
    
    for u in args.url:
        if args.offline:
            path = get_cache_path(u)
        elif args.ids:
            ids = _get_site_ids(u)
        else:
            path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
                                validity=args.cache_validity)
        if args.path:
            print(path)
        elif args.ids:
            print(ids)
        else:
            with open(path,"r") as f:
                print(f.read())
diff --git a/offpunk.py b/offpunk.py
index 8235566..9f9af1f 100755
--- a/offpunk.py
+++ b/offpunk.py
@@ -89,6 +89,7 @@ _ABBREVS = {
    "bb":   "blackbox",
    "bm":   "bookmarks",
    "book": "bookmarks",
    "cert": "certs",
    "cp":   "copy",
    "f":   "forward",
    "g":    "go",
@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
                except IndexError:
                    print("Invalid index %d, skipping." % n)

    @needs_gi
    def do_certs(self, line) -> None:
        """Manage your client certificates (identities) for a site.
        `certs` will display all valid certificates for the current site
        `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.
        """
        line = line.strip()
        if not line:
            certs = netcache.get_certs(self.current_url)
            if len(certs) == 0:
                print("There are no certificates available for this site.")
            else:
                if len(certs) == 1:
                    print("The one available certificate for this site is:")
                else:
                    print("The", len(certs) ,"available certificates for this site are:")

                print(*certs)
                print("Use the 'id@site.net' notation to activate a certificate.")
        else:
            lineparts = line.split(' ')
            if lineparts[0] == 'new':
                if len(lineparts) == 4:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = lineparts[3]
                    netcache.create_certificate(name, int(days), site)
                elif len(lineparts) == 3:
                    name = lineparts[1]
                    days = lineparts[2]
                    site = urllib.parse.urlparse(self.current_url)
                    netcache.create_certificate(name, int(days), site.hostname)

                else:
                    print("usage")

    @needs_gi
    def do_mark(self, line):
        """Mark the current item with a single letter.  This letter can then
-- 
2.46.0

[PATCH] Part two of the previous commit, adds the --ids option to netcache.

Details
Message ID
<20240807232404.90278-2-bert@bertlivens.be>
In-Reply-To
<20240807230158.88593-1-bert@bertlivens.be> (view parent)
DKIM signature
pass
Download raw message
Patch: +5 -3
So, this is the part that doesn't want to apply. In the hope it is the problem, I replaced a directed ' with a straight '.
If that doesn't help, it is probably easiest to just copy and paste the code.
(The symbol does appear a few other times in the codebase but maybe those never passed through git send-email.)

Signed-off-by: Bert Livens <bert@bertlivens.be>
---
 netcache.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/netcache.py b/netcache.py
index 7640f69..2d0743a 100644
--- a/netcache.py
+++ b/netcache.py
@@ -986,12 +986,14 @@ def main():
    
    descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
            By default, netcache will returns a cached version of a given URL, downloading it \
            only if not existing. A validity duration, in seconds, can also be given so that \
            netcache downloads the content only if the existing cache is older than the validity."
            only if a cache version doesn't exist. A validity duration, in seconds, can also \
            be given so netcache downloads the content only if the existing cache is older than the validity."
    # Parse arguments
    parser = argparse.ArgumentParser(prog="netcache",description=descri)
    parser.add_argument("--path", action="store_true",
                        help="return path to the cache instead of the content of the cache")
    parser.add_argument("--ids", action="store_true",
                        help="return a list of id's for the gemini-site instead of the content of the cache")
    parser.add_argument("--offline", action="store_true",
                        help="Do not attempt to download, return cached version or error")
    parser.add_argument("--max-size", type=int,
@@ -1008,7 +1010,7 @@ def main():
    # --force-download : download and replace cache, even if valid
    args = parser.parse_args()
    param = {}
    

    for u in args.url:
        if args.offline:
            path = get_cache_path(u)
-- 
2.46.0

Re: [PATCH] Part two of the previous commit, adds the --ids option to netcache.

Details
Message ID
<172312025238.7.11748384444016250068.400595352@ploum.eu>
In-Reply-To
<20240807232404.90278-2-bert@bertlivens.be> (view parent)
DKIM signature
pass
Download raw message
Thanks Bert,

It looks like the fault was on my side. When applying the patches 
straight out my inbox, they fail. While downloading them on sourcehut 
then applying them works.

The only difference seems that, when downloaded, the file seems to have 
CRLF line terminators which is not the case in my inbox.

I don’t know:

- Why there’s such a difference
- If this is what was causing the failure (if yes, why failing only on 
   one section)

Anyway, this means that I’ve managed to apply your patches!


On 24 aoû 08 01:24, Bert Livens wrote:
>So, this is the part that doesn't want to apply. In the hope it is the problem, I replaced a directed ' with a straight '.
>If that doesn't help, it is probably easiest to just copy and paste the code.
>(The symbol does appear a few other times in the codebase but maybe those never passed through git send-email.)
>
>Signed-off-by: Bert Livens <bert@bertlivens.be>
>---
> netcache.py | 8 +++++---
> 1 file changed, 5 insertions(+), 3 deletions(-)
>
>diff --git a/netcache.py b/netcache.py
>index 7640f69..2d0743a 100644
>--- a/netcache.py
>+++ b/netcache.py
>@@ -986,12 +986,14 @@ def main():
>
>     descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
>             By default, netcache will returns a cached version of a given URL, downloading it \
>-            only if not existing. A validity duration, in seconds, can also be given so that \
>-            netcache downloads the content only if the existing cache is older than the validity."
>+            only if a cache version doesn't exist. A validity duration, in seconds, can also \
>+            be given so netcache downloads the content only if the existing cache is older than the validity."
>     # Parse arguments
>     parser = argparse.ArgumentParser(prog="netcache",description=descri)
>     parser.add_argument("--path", action="store_true",
>                         help="return path to the cache instead of the content of the cache")
>+    parser.add_argument("--ids", action="store_true",
>+                        help="return a list of id's for the gemini-site instead of the content of the cache")
>     parser.add_argument("--offline", action="store_true",
>                         help="Do not attempt to download, return cached version or error")
>     parser.add_argument("--max-size", type=int,
>@@ -1008,7 +1010,7 @@ def main():
>     # --force-download : download and replace cache, even if valid
>     args = parser.parse_args()
>     param = {}
>-
>+
>     for u in args.url:
>         if args.offline:
>             path = get_cache_path(u)
>--
>2.46.0
>
>

-- 
Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html
Reply to thread Export thread (mbox)