Hi everyone,
I've been away for a while (exams, vacation) but I'm back.
Once again, this is a patch that tries to add support for client certificates to offpunk and provides the command to use them. It is based on the most recent version of Offpunk.
More information about what this tries to do and how it works can be found in this previous thread: https://lists.sr.ht/~lioploum/offpunk-devel/<bc5af397-9386-450d-bf2a-45f5cceccedd@bertlivens.be>
Don't hesitate to ask any questions you have.
~~ Bert Livens
Signed-off-by: Bert Livens <bert@bertlivens.be>
---
CHANGELOG | 2 +netcache.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++------offpunk.py | 37 ++++++++++++++
3 files changed, 167 insertions(+), 17 deletions(-)
mode change 100755 => 100644 netcache.py
diff --git a/CHANGELOG b/CHANGELOG
index e24ef8e..1ee9dfd 100644
--- a/CHANGELOG+++ b/CHANGELOG
@@ -3,6 +3,8 @@
## 2.4 - Unreleased
- Deprecation warning if using Chafa < 1.10
- "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)
+- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)+- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)## 2.3 - June 29th 2024
- Wayland clipboard support through wl-clipboard (new suggested dependency)
diff --git a/netcache.py b/netcache.py
old mode 100755
new mode 100644
index 38f001e..d775472
--- a/netcache.py+++ b/netcache.py
@@ -24,6 +24,9 @@ except ModuleNotFoundError:
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import hashes+ from cryptography.hazmat.primitives.asymmetric import rsa+ from cryptography.hazmat.primitives import serialization _HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
+def _get_client_certkey(site_id: str, host: str):+ # returns {cert: str, key: str}+ certdir = os.path.join(xdg("data"), "certs", host)+ certf = os.path.join(certdir, "%s.cert" % site_id)+ keyf = os.path.join(certdir, "%s.key" % site_id)+ if not os.path.exists(certf) or not os.path.exists(keyf):+ if host != "":+ split = host.split(".")+ #if len(split) > 2: # Why not allow a global identity? Maybe I want+ # to login to all sites with the same+ # certificate.+ return _get_client_certkey(site_id, ".".join(split[1:]))+ return None+ certkey = dict(cert=certf, key=keyf)+ return certkey++def _get_site_ids(url: str):+ newurl = normalize_url(url)+ u = urllib.parse.urlparse(newurl)+ if u.scheme == "gemini" and u.username == None:+ certdir = os.path.join(xdg("data"), "certs")+ netloc_parts = u.netloc.split(".")+ site_ids = []++ for i in range(len(netloc_parts), 0, -1):+ lasti = ".".join(netloc_parts[-i:])+ direc = os.path.join(certdir, lasti)++ for certfile in glob.glob(os.path.join(direc, "*.cert")):+ site_id = certfile.split('/')[-1].split(".")[-2]+ site_ids.append(site_id)+ return site_ids+ else:+ return []++def create_certificate(name: str, days: int, hostname: str):+ key = rsa.generate_private_key(+ public_exponent = 65537,+ key_size = 2048)+ sitecertdir = os.path.join(xdg("data"), "certs", hostname)+ keyfile = os.path.join(sitecertdir, name+".key")+ # create the directory of it doesn't exist+ os.makedirs(sitecertdir, exist_ok=True)+ with open(keyfile, "wb") as f:+ f.write(key.private_bytes(+ encoding=serialization.Encoding.PEM,+ format=serialization.PrivateFormat.TraditionalOpenSSL,+ encryption_algorithm=serialization.NoEncryption()+ ))+ xname = x509.Name([+ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),+ ])+ # generate the cert, valid a week ago (timekeeping is hard, let's give it a+ # little margin). issuer and subject are your name+ cert = (x509.CertificateBuilder()+ .subject_name(xname)+ .issuer_name(xname)+ .public_key(key.public_key())+ .serial_number(x509.random_serial_number())+ .not_valid_before(datetime.datetime.utcnow() -+ datetime.timedelta(days=7))+ .not_valid_after(datetime.datetime.utcnow() ++ datetime.timedelta(days=days))+ .sign(key, hashes.SHA256())+ )+ certfile = os.path.join(sitecertdir, name + ".cert")+ with open(certfile, "wb") as f:+ f.write(cert.public_bytes(serialization.Encoding.PEM))++def get_certs(url: str):+ u = urllib.parse.urlparse(normalize_url(url))+ if u.scheme == "gemini":+ certdir = os.path.join(xdg("data"), "certs")+ netloc_parts = u.netloc.split(".")+ site_ids = []+ if '@' in netloc_parts[0]:+ netloc_parts[0] = netloc_parts[0].split('@')[1]++ for i in range(len(netloc_parts), 0, -1):+ lasti = ".".join(netloc_parts[-i:])+ direc = os.path.join(certdir, lasti)+ for certfile in glob.glob(os.path.join(direc, "*.cert")):+ site_id = certfile.split('/')[-1].split(".")[-2]+ site_ids.append(site_id)+ return site_ids+ else:+ return []+def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
**kwargs):
cache = None
newurl = url
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
+ site_id = url_parts.username port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
- context.check_hostname=False+ context.check_hostname = False context.verify_mode = ssl.CERT_NONE
++ # When using an identity, use the certificate and key+ if site_id:+ certkey = _get_client_certkey(site_id, host)+ if certkey:+ context.load_cert_chain(certkey["cert"], certkey["key"])+ else:+ print("This identity doesn't exist for this site (or is disabled).") # Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
_validate_cert(address[4][0], host, cert,automatic_choice="y")
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
- new_netloc = host+ new_host = host #Handle IPV6 hostname
- if ":" in new_netloc:- new_netloc = "[" + new_netloc + "]"+ if ":" in new_host:+ new_host = "[" + new_host + "]" if port != standard_ports["gemini"]:
- new_netloc += ":" + str(port)- url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))- s.sendall((url + CRLF).encode("UTF-8"))- f= s.makefile(mode = "rb")+ new_host += ":" + str(port)+ url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))++ if site_id:+ url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))+ else:+ url = url_no_username++ s.sendall((url_no_username + CRLF).encode("UTF-8"))+ f = s.makefile(mode = "rb") ## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
- error = "Handling certificates for status 6X are not supported by offpunk\n"- error += "See bug #31 for discussion about the problem"+ error = "You need to provide a client-certificate to access this page." raise RuntimeError(error)
# Invalid status
elif not status.startswith("2"):
@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
newurl = url
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
- #Firt, we look if we have a valid cache, even if offline+ #First, we look if we have a valid cache, even if offline #If we are offline, any cache is better than nothing
if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
path = get_cache_path(url)
@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
path = None
elif scheme in ("http","https"):
if _DO_HTTP:
- path=_fetch_http(url,**kwargs)+ path=_fetch_http(newurl,**kwargs) else:
print("HTTP requires python-requests")
elif scheme == "gopher":
- path=_fetch_gopher(url,**kwargs)+ path=_fetch_gopher(newurl,**kwargs) elif scheme == "finger":
- path=_fetch_finger(url,**kwargs)+ path=_fetch_finger(newurl,**kwargs) elif scheme == "gemini":
path,newurl=_fetch_gemini(url,**kwargs)
elif scheme == "spartan":
@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
except UserAbortException:
return None, newurl
except Exception as err:
- cache = set_error(url, err)+ cache = set_error(newurl, err) # Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
@@ -881,12 +986,14 @@ def main():
descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
By default, netcache will returns a cached version of a given URL, downloading it \
- only if not existing. A validity duration, in seconds, can also be given so that \- netcache downloads the content only if the existing cache is older than the validity."+ only if a cache version doesn't exist. A validity duration, in seconds, can also \+ be given so netcache downloads the content only if the existing cache is older than the validity." # Parse arguments
parser = argparse.ArgumentParser(prog="netcache",description=descri)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
+ parser.add_argument("--ids", action="store_true",+ help="return a list of id’s for the gemini-site instead of the content of the cache") parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
@@ -908,11 +1015,15 @@ def main():
for u in args.url:
if args.offline:
path = get_cache_path(u)
+ elif args.ids:+ ids = _get_site_ids(u) else:
path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
validity=args.cache_validity)
if args.path:
print(path)
+ elif args.ids:+ print(ids) else:
with open(path,"r") as f:
print(f.read())
diff --git a/offpunk.py b/offpunk.py
index 8235566..9f9af1f 100755
--- a/offpunk.py+++ b/offpunk.py
@@ -89,6 +89,7 @@ _ABBREVS = {
"bb": "blackbox",
"bm": "bookmarks",
"book": "bookmarks",
+ "cert": "certs", "cp": "copy",
"f": "forward",
"g": "go",
@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
except IndexError:
print("Invalid index %d, skipping." % n)
+ @needs_gi+ def do_certs(self, line) -> None:+ """Manage your client certificates (identities) for a site.+ `certs` will display all valid certificates for the current site+ `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.+ """+ line = line.strip()+ if not line:+ certs = netcache.get_certs(self.current_url)+ if len(certs) == 0:+ print("There are no certificates available for this site.")+ else:+ if len(certs) == 1:+ print("The one available certificate for this site is:")+ else:+ print("The", len(certs) ,"available certificates for this site are:")++ print(*certs)+ print("Use the 'id@site.net' notation to activate a certificate.")+ else:+ lineparts = line.split(' ')+ if lineparts[0] == 'new':+ if len(lineparts) == 4:+ name = lineparts[1]+ days = lineparts[2]+ site = lineparts[3]+ netcache.create_certificate(name, int(days), site)+ elif len(lineparts) == 3:+ name = lineparts[1]+ days = lineparts[2]+ site = urllib.parse.urlparse(self.current_url)+ netcache.create_certificate(name, int(days), site.hostname)++ else:+ print("usage")+ @needs_gi
def do_mark(self, line):
"""Mark the current item with a single letter. This letter can then
--
2.46.0
On 24 aoû 07 02:06, Bert Livens wrote:
>Hi everyone,>I've been away for a while (exams, vacation) but I'm back.>Once again, this is a patch that tries to add support for client certificates to offpunk and provides the command to use them. It is based on the most recent version of Offpunk.>>More information about what this tries to do and how it works can be found in this previous thread: https://lists.sr.ht/~lioploum/offpunk-devel/<bc5af397-9386-450d-bf2a-45f5cceccedd@bertlivens.be>>Don't hesitate to ask any questions you have.
Hi Bert,
Very happy to see you back. I still don’t understand why this patch
doesn’t apply. Everything seems fine. The rejection appears to be in
hunk #9 but I don’t see any problem.
I copy/paste .rej file. If you can’t spot what’s wrong, maybe could you
resend your patch without this part so the bulk of the work can be
applied cleanly.
You could then submit a small patch to add the "--ids" parameter (or I
could add it myself manually).
If anybody reading this has any way to see why the patch is failing, I
would be very curious. It seems that I’ve hit the ceiling of my weak git
knowledege here.
The .rej file:
diff a/netcache.py b/netcache.py (rejected hunks)
@@ -881,12 +986,14 @@ def main():
descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
By default, netcache will returns a cached version of a given URL, downloading it \
- only if not existing. A validity duration, in seconds, can also be given so that \
- netcache downloads the content only if the existing cache is older than the validity."
+ only if a cache version doesn't exist. A validity duration, in seconds, can also \
+ be given so netcache downloads the content only if the existing cache is older than the validity."
# Parse arguments
parser = argparse.ArgumentParser(prog="netcache",description=descri)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
+ parser.add_argument("--ids", action="store_true",
+ help="return a list of id’s for the gemini-site instead of the content of the cache")
parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
>>~~ Bert Livens>>>Signed-off-by: Bert Livens <bert@bertlivens.be>>---> CHANGELOG | 2 +> netcache.py | 145 ++++++++++++++++++++++++++++++++++++++++++++++------> offpunk.py | 37 ++++++++++++++> 3 files changed, 167 insertions(+), 17 deletions(-)> mode change 100755 => 100644 netcache.py>>diff --git a/CHANGELOG b/CHANGELOG>index e24ef8e..1ee9dfd 100644>--- a/CHANGELOG>+++ b/CHANGELOG>@@ -3,6 +3,8 @@> ## 2.4 - Unreleased> - Deprecation warning if using Chafa < 1.10> - "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)>+- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)>+- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)>> ## 2.3 - June 29th 2024> - Wayland clipboard support through wl-clipboard (new suggested dependency)>diff --git a/netcache.py b/netcache.py>old mode 100755>new mode 100644>index 38f001e..d775472>--- a/netcache.py>+++ b/netcache.py>@@ -24,6 +24,9 @@ except ModuleNotFoundError:> try:> from cryptography import x509> from cryptography.hazmat.backends import default_backend>+ from cryptography.hazmat.primitives import hashes>+ from cryptography.hazmat.primitives.asymmetric import rsa>+ from cryptography.hazmat.primitives import serialization> _HAS_CRYPTOGRAPHY = True> _BACKEND = default_backend()> except(ModuleNotFoundError,ImportError):>@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non> with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:> fp.write(cert)>>+def _get_client_certkey(site_id: str, host: str):>+ # returns {cert: str, key: str}>+ certdir = os.path.join(xdg("data"), "certs", host)>+ certf = os.path.join(certdir, "%s.cert" % site_id)>+ keyf = os.path.join(certdir, "%s.key" % site_id)>+ if not os.path.exists(certf) or not os.path.exists(keyf):>+ if host != "":>+ split = host.split(".")>+ #if len(split) > 2: # Why not allow a global identity? Maybe I want>+ # to login to all sites with the same>+ # certificate.>+ return _get_client_certkey(site_id, ".".join(split[1:]))>+ return None>+ certkey = dict(cert=certf, key=keyf)>+ return certkey>+>+def _get_site_ids(url: str):>+ newurl = normalize_url(url)>+ u = urllib.parse.urlparse(newurl)>+ if u.scheme == "gemini" and u.username == None:>+ certdir = os.path.join(xdg("data"), "certs")>+ netloc_parts = u.netloc.split(".")>+ site_ids = []>+>+ for i in range(len(netloc_parts), 0, -1):>+ lasti = ".".join(netloc_parts[-i:])>+ direc = os.path.join(certdir, lasti)>+>+ for certfile in glob.glob(os.path.join(direc, "*.cert")):>+ site_id = certfile.split('/')[-1].split(".")[-2]>+ site_ids.append(site_id)>+ return site_ids>+ else:>+ return []>+>+def create_certificate(name: str, days: int, hostname: str):>+ key = rsa.generate_private_key(>+ public_exponent = 65537,>+ key_size = 2048)>+ sitecertdir = os.path.join(xdg("data"), "certs", hostname)>+ keyfile = os.path.join(sitecertdir, name+".key")>+ # create the directory of it doesn't exist>+ os.makedirs(sitecertdir, exist_ok=True)>+ with open(keyfile, "wb") as f:>+ f.write(key.private_bytes(>+ encoding=serialization.Encoding.PEM,>+ format=serialization.PrivateFormat.TraditionalOpenSSL,>+ encryption_algorithm=serialization.NoEncryption()>+ ))>+ xname = x509.Name([>+ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),>+ ])>+ # generate the cert, valid a week ago (timekeeping is hard, let's give it a>+ # little margin). issuer and subject are your name>+ cert = (x509.CertificateBuilder()>+ .subject_name(xname)>+ .issuer_name(xname)>+ .public_key(key.public_key())>+ .serial_number(x509.random_serial_number())>+ .not_valid_before(datetime.datetime.utcnow() ->+ datetime.timedelta(days=7))>+ .not_valid_after(datetime.datetime.utcnow() +>+ datetime.timedelta(days=days))>+ .sign(key, hashes.SHA256())>+ )>+ certfile = os.path.join(sitecertdir, name + ".cert")>+ with open(certfile, "wb") as f:>+ f.write(cert.public_bytes(serialization.Encoding.PEM))>+>+def get_certs(url: str):>+ u = urllib.parse.urlparse(normalize_url(url))>+ if u.scheme == "gemini":>+ certdir = os.path.join(xdg("data"), "certs")>+ netloc_parts = u.netloc.split(".")>+ site_ids = []>+ if '@' in netloc_parts[0]:>+ netloc_parts[0] = netloc_parts[0].split('@')[1]>+>+ for i in range(len(netloc_parts), 0, -1):>+ lasti = ".".join(netloc_parts[-i:])>+ direc = os.path.join(certdir, lasti)>+ for certfile in glob.glob(os.path.join(direc, "*.cert")):>+ site_id = certfile.split('/')[-1].split(".")[-2]>+ site_ids.append(site_id)>+ return site_ids>+ else:>+ return []>+> def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\> **kwargs):> cache = None> newurl = url> url_parts = urllib.parse.urlparse(url)> host = url_parts.hostname>+ site_id = url_parts.username> port = url_parts.port or standard_ports["gemini"]> path = url_parts.path or "/"> query = url_parts.query>@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce> # Prepare TLS context> protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2> context = ssl.SSLContext(protocol)>- context.check_hostname=False>+ context.check_hostname = False> context.verify_mode = ssl.CERT_NONE>+>+ # When using an identity, use the certificate and key>+ if site_id:>+ certkey = _get_client_certkey(site_id, host)>+ if certkey:>+ context.load_cert_chain(certkey["cert"], certkey["key"])>+ else:>+ print("This identity doesn't exist for this site (or is disabled).")> # Impose minimum TLS version> ## In 3.7 and above, this is easy...> if sys.version_info.minor >= 7:>@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce> _validate_cert(address[4][0], host, cert,automatic_choice="y")> # Send request and wrap response in a file descriptor> url = urllib.parse.urlparse(url)>- new_netloc = host>+ new_host = host> #Handle IPV6 hostname>- if ":" in new_netloc:>- new_netloc = "[" + new_netloc + "]">+ if ":" in new_host:>+ new_host = "[" + new_host + "]"> if port != standard_ports["gemini"]:>- new_netloc += ":" + str(port)>- url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))>- s.sendall((url + CRLF).encode("UTF-8"))>- f= s.makefile(mode = "rb")>+ new_host += ":" + str(port)>+ url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))>+>+ if site_id:>+ url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))>+ else:>+ url = url_no_username>+>+ s.sendall((url_no_username + CRLF).encode("UTF-8"))>+ f = s.makefile(mode = "rb")> ## end of send_request in AV98> # Spec dictates <META> should not exceed 1024 bytes,> # so maximum valid header length is 1027 bytes.>@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce> raise RuntimeError(meta)> # Client cert> elif status.startswith("6"):>- error = "Handling certificates for status 6X are not supported by offpunk\n">- error += "See bug #31 for discussion about the problem">+ error = "You need to provide a client-certificate to access this page."> raise RuntimeError(error)> # Invalid status> elif not status.startswith("2"):>@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val> newurl = url> path=None> print_error = "print_error" in kwargs.keys() and kwargs["print_error"]>- #Firt, we look if we have a valid cache, even if offline>+ #First, we look if we have a valid cache, even if offline> #If we are offline, any cache is better than nothing> if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):> path = get_cache_path(url)>@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val> path = None> elif scheme in ("http","https"):> if _DO_HTTP:>- path=_fetch_http(url,**kwargs)>+ path=_fetch_http(newurl,**kwargs)> else:> print("HTTP requires python-requests")> elif scheme == "gopher":>- path=_fetch_gopher(url,**kwargs)>+ path=_fetch_gopher(newurl,**kwargs)> elif scheme == "finger":>- path=_fetch_finger(url,**kwargs)>+ path=_fetch_finger(newurl,**kwargs)> elif scheme == "gemini":> path,newurl=_fetch_gemini(url,**kwargs)> elif scheme == "spartan":>@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val> except UserAbortException:> return None, newurl> except Exception as err:>- cache = set_error(url, err)>+ cache = set_error(newurl, err)> # Print an error message> # we fail silently when sync_only> if isinstance(err, socket.gaierror):>@@ -881,12 +986,14 @@ def main():>> descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\> By default, netcache will returns a cached version of a given URL, downloading it \>- only if not existing. A validity duration, in seconds, can also be given so that \>- netcache downloads the content only if the existing cache is older than the validity.">+ only if a cache version doesn't exist. A validity duration, in seconds, can also \>+ be given so netcache downloads the content only if the existing cache is older than the validity."> # Parse arguments> parser = argparse.ArgumentParser(prog="netcache",description=descri)> parser.add_argument("--path", action="store_true",> help="return path to the cache instead of the content of the cache")>+ parser.add_argument("--ids", action="store_true",>+ help="return a list of id’s for the gemini-site instead of the content of the cache")> parser.add_argument("--offline", action="store_true",> help="Do not attempt to download, return cached version or error")> parser.add_argument("--max-size", type=int,>@@ -908,11 +1015,15 @@ def main():> for u in args.url:> if args.offline:> path = get_cache_path(u)>+ elif args.ids:>+ ids = _get_site_ids(u)> else:> path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\> validity=args.cache_validity)> if args.path:> print(path)>+ elif args.ids:>+ print(ids)> else:> with open(path,"r") as f:> print(f.read())>diff --git a/offpunk.py b/offpunk.py>index 8235566..9f9af1f 100755>--- a/offpunk.py>+++ b/offpunk.py>@@ -89,6 +89,7 @@ _ABBREVS = {> "bb": "blackbox",> "bm": "bookmarks",> "book": "bookmarks",>+ "cert": "certs",> "cp": "copy",> "f": "forward",> "g": "go",>@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""> except IndexError:> print("Invalid index %d, skipping." % n)>>+ @needs_gi>+ def do_certs(self, line) -> None:>+ """Manage your client certificates (identities) for a site.>+ `certs` will display all valid certificates for the current site>+ `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.>+ """>+ line = line.strip()>+ if not line:>+ certs = netcache.get_certs(self.current_url)>+ if len(certs) == 0:>+ print("There are no certificates available for this site.")>+ else:>+ if len(certs) == 1:>+ print("The one available certificate for this site is:")>+ else:>+ print("The", len(certs) ,"available certificates for this site are:")>+>+ print(*certs)>+ print("Use the 'id@site.net' notation to activate a certificate.")>+ else:>+ lineparts = line.split(' ')>+ if lineparts[0] == 'new':>+ if len(lineparts) == 4:>+ name = lineparts[1]>+ days = lineparts[2]>+ site = lineparts[3]>+ netcache.create_certificate(name, int(days), site)>+ elif len(lineparts) == 3:>+ name = lineparts[1]>+ days = lineparts[2]>+ site = urllib.parse.urlparse(self.current_url)>+ netcache.create_certificate(name, int(days), site.hostname)>+>+ else:>+ print("usage")>+> @needs_gi> def do_mark(self, line):> """Mark the current item with a single letter. This letter can then>-->2.46.0>>
--
Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html
[PATCH v2] Added support for client-certificates to netcache and the certs command to manage them.
This is part one, it doesn't define the flags
for netcache.py yet so the command won't work.
Signed-off-by: Bert Livens <bert@bertlivens.be>
---
CHANGELOG | 2 +netcache.py | 140 ++++++++++++++++++++++++++++++++++++++++++++++------offpunk.py | 37 ++++++++++++++
3 files changed, 163 insertions(+), 16 deletions(-)
mode change 100755 => 100644 netcache.py
diff --git a/CHANGELOG b/CHANGELOG
index e24ef8e..1ee9dfd 100644
--- a/CHANGELOG+++ b/CHANGELOG
@@ -3,6 +3,8 @@
## 2.4 - Unreleased
- Deprecation warning if using Chafa < 1.10
- "open" now accept integer as parameters to open links (suggested by Matthieu Rakotojaona)
+- netcache: use client-certificate when going to a url like gemini://username@site.net (by Bert Livens)+- offpunk/netcache: added the "cert" command to list and create client certificates (Bert Livens)## 2.3 - June 29th 2024
- Wayland clipboard support through wl-clipboard (new suggested dependency)
diff --git a/netcache.py b/netcache.py
old mode 100755
new mode 100644
index 38f001e..7640f69
--- a/netcache.py+++ b/netcache.py
@@ -24,6 +24,9 @@ except ModuleNotFoundError:
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
+ from cryptography.hazmat.primitives import hashes+ from cryptography.hazmat.primitives.asymmetric import rsa+ from cryptography.hazmat.primitives import serialization _HAS_CRYPTOGRAPHY = True
_BACKEND = default_backend()
except(ModuleNotFoundError,ImportError):
@@ -589,12 +592,101 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non
with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp:
fp.write(cert)
+def _get_client_certkey(site_id: str, host: str):+ # returns {cert: str, key: str}+ certdir = os.path.join(xdg("data"), "certs", host)+ certf = os.path.join(certdir, "%s.cert" % site_id)+ keyf = os.path.join(certdir, "%s.key" % site_id)+ if not os.path.exists(certf) or not os.path.exists(keyf):+ if host != "":+ split = host.split(".")+ #if len(split) > 2: # Why not allow a global identity? Maybe I want+ # to login to all sites with the same+ # certificate.+ return _get_client_certkey(site_id, ".".join(split[1:]))+ return None+ certkey = dict(cert=certf, key=keyf)+ return certkey++def _get_site_ids(url: str):+ newurl = normalize_url(url)+ u = urllib.parse.urlparse(newurl)+ if u.scheme == "gemini" and u.username == None:+ certdir = os.path.join(xdg("data"), "certs")+ netloc_parts = u.netloc.split(".")+ site_ids = []++ for i in range(len(netloc_parts), 0, -1):+ lasti = ".".join(netloc_parts[-i:])+ direc = os.path.join(certdir, lasti)++ for certfile in glob.glob(os.path.join(direc, "*.cert")):+ site_id = certfile.split('/')[-1].split(".")[-2]+ site_ids.append(site_id)+ return site_ids+ else:+ return []++def create_certificate(name: str, days: int, hostname: str):+ key = rsa.generate_private_key(+ public_exponent = 65537,+ key_size = 2048)+ sitecertdir = os.path.join(xdg("data"), "certs", hostname)+ keyfile = os.path.join(sitecertdir, name+".key")+ # create the directory of it doesn't exist+ os.makedirs(sitecertdir, exist_ok=True)+ with open(keyfile, "wb") as f:+ f.write(key.private_bytes(+ encoding=serialization.Encoding.PEM,+ format=serialization.PrivateFormat.TraditionalOpenSSL,+ encryption_algorithm=serialization.NoEncryption()+ ))+ xname = x509.Name([+ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),+ ])+ # generate the cert, valid a week ago (timekeeping is hard, let's give it a+ # little margin). issuer and subject are your name+ cert = (x509.CertificateBuilder()+ .subject_name(xname)+ .issuer_name(xname)+ .public_key(key.public_key())+ .serial_number(x509.random_serial_number())+ .not_valid_before(datetime.datetime.utcnow() -+ datetime.timedelta(days=7))+ .not_valid_after(datetime.datetime.utcnow() ++ datetime.timedelta(days=days))+ .sign(key, hashes.SHA256())+ )+ certfile = os.path.join(sitecertdir, name + ".cert")+ with open(certfile, "wb") as f:+ f.write(cert.public_bytes(serialization.Encoding.PEM))++def get_certs(url: str):+ u = urllib.parse.urlparse(normalize_url(url))+ if u.scheme == "gemini":+ certdir = os.path.join(xdg("data"), "certs")+ netloc_parts = u.netloc.split(".")+ site_ids = []+ if '@' in netloc_parts[0]:+ netloc_parts[0] = netloc_parts[0].split('@')[1]++ for i in range(len(netloc_parts), 0, -1):+ lasti = ".".join(netloc_parts[-i:])+ direc = os.path.join(certdir, lasti)+ for certfile in glob.glob(os.path.join(direc, "*.cert")):+ site_id = certfile.split('/')[-1].split(".")[-2]+ site_ids.append(site_id)+ return site_ids+ else:+ return []+def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\
**kwargs):
cache = None
newurl = url
url_parts = urllib.parse.urlparse(url)
host = url_parts.hostname
+ site_id = url_parts.username port = url_parts.port or standard_ports["gemini"]
path = url_parts.path or "/"
query = url_parts.query
@@ -622,8 +714,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
# Prepare TLS context
protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2
context = ssl.SSLContext(protocol)
- context.check_hostname=False+ context.check_hostname = False context.verify_mode = ssl.CERT_NONE
++ # When using an identity, use the certificate and key+ if site_id:+ certkey = _get_client_certkey(site_id, host)+ if certkey:+ context.load_cert_chain(certkey["cert"], certkey["key"])+ else:+ print("This identity doesn't exist for this site (or is disabled).") # Impose minimum TLS version
## In 3.7 and above, this is easy...
if sys.version_info.minor >= 7:
@@ -665,15 +765,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
_validate_cert(address[4][0], host, cert,automatic_choice="y")
# Send request and wrap response in a file descriptor
url = urllib.parse.urlparse(url)
- new_netloc = host+ new_host = host #Handle IPV6 hostname
- if ":" in new_netloc:- new_netloc = "[" + new_netloc + "]"+ if ":" in new_host:+ new_host = "[" + new_host + "]" if port != standard_ports["gemini"]:
- new_netloc += ":" + str(port)- url = urllib.parse.urlunparse(url._replace(netloc=new_netloc))- s.sendall((url + CRLF).encode("UTF-8"))- f= s.makefile(mode = "rb")+ new_host += ":" + str(port)+ url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host))++ if site_id:+ url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host))+ else:+ url = url_no_username++ s.sendall((url_no_username + CRLF).encode("UTF-8"))+ f = s.makefile(mode = "rb") ## end of send_request in AV98
# Spec dictates <META> should not exceed 1024 bytes,
# so maximum valid header length is 1027 bytes.
@@ -744,8 +850,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce
raise RuntimeError(meta)
# Client cert
elif status.startswith("6"):
- error = "Handling certificates for status 6X are not supported by offpunk\n"- error += "See bug #31 for discussion about the problem"+ error = "You need to provide a client-certificate to access this page." raise RuntimeError(error)
# Invalid status
elif not status.startswith("2"):
@@ -785,7 +890,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
newurl = url
path=None
print_error = "print_error" in kwargs.keys() and kwargs["print_error"]
- #Firt, we look if we have a valid cache, even if offline+ #First, we look if we have a valid cache, even if offline #If we are offline, any cache is better than nothing
if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)):
path = get_cache_path(url)
@@ -803,13 +908,13 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
path = None
elif scheme in ("http","https"):
if _DO_HTTP:
- path=_fetch_http(url,**kwargs)+ path=_fetch_http(newurl,**kwargs) else:
print("HTTP requires python-requests")
elif scheme == "gopher":
- path=_fetch_gopher(url,**kwargs)+ path=_fetch_gopher(newurl,**kwargs) elif scheme == "finger":
- path=_fetch_finger(url,**kwargs)+ path=_fetch_finger(newurl,**kwargs) elif scheme == "gemini":
path,newurl=_fetch_gemini(url,**kwargs)
elif scheme == "spartan":
@@ -819,7 +924,7 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val
except UserAbortException:
return None, newurl
except Exception as err:
- cache = set_error(url, err)+ cache = set_error(newurl, err) # Print an error message
# we fail silently when sync_only
if isinstance(err, socket.gaierror):
@@ -902,17 +1007,20 @@ def main():
# --validity : returns the date of the cached version, Null if no version
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
- param = {}
for u in args.url:
if args.offline:
path = get_cache_path(u)
+ elif args.ids:+ ids = _get_site_ids(u) else:
path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\
validity=args.cache_validity)
if args.path:
print(path)
+ elif args.ids:+ print(ids) else:
with open(path,"r") as f:
print(f.read())
diff --git a/offpunk.py b/offpunk.py
index 8235566..9f9af1f 100755
--- a/offpunk.py+++ b/offpunk.py
@@ -89,6 +89,7 @@ _ABBREVS = {
"bb": "blackbox",
"bm": "bookmarks",
"book": "bookmarks",
+ "cert": "certs", "cp": "copy",
"f": "forward",
"g": "go",
@@ -815,6 +816,42 @@ Current tour can be listed with `tour ls` and scrubbed with `tour clear`."""
except IndexError:
print("Invalid index %d, skipping." % n)
+ @needs_gi+ def do_certs(self, line) -> None:+ """Manage your client certificates (identities) for a site.+ `certs` will display all valid certificates for the current site+ `certs new <name> <days-valid> <url[optional]>` will create a new certificate, if no url is specified, the current open site will be used.+ """+ line = line.strip()+ if not line:+ certs = netcache.get_certs(self.current_url)+ if len(certs) == 0:+ print("There are no certificates available for this site.")+ else:+ if len(certs) == 1:+ print("The one available certificate for this site is:")+ else:+ print("The", len(certs) ,"available certificates for this site are:")++ print(*certs)+ print("Use the 'id@site.net' notation to activate a certificate.")+ else:+ lineparts = line.split(' ')+ if lineparts[0] == 'new':+ if len(lineparts) == 4:+ name = lineparts[1]+ days = lineparts[2]+ site = lineparts[3]+ netcache.create_certificate(name, int(days), site)+ elif len(lineparts) == 3:+ name = lineparts[1]+ days = lineparts[2]+ site = urllib.parse.urlparse(self.current_url)+ netcache.create_certificate(name, int(days), site.hostname)++ else:+ print("usage")+ @needs_gi
def do_mark(self, line):
"""Mark the current item with a single letter. This letter can then
--
2.46.0
[PATCH] Part two of the previous commit, adds the --ids option to netcache.
So, this is the part that doesn't want to apply. In the hope it is the problem, I replaced a directed ' with a straight '.
If that doesn't help, it is probably easiest to just copy and paste the code.
(The symbol does appear a few other times in the codebase but maybe those never passed through git send-email.)
Signed-off-by: Bert Livens <bert@bertlivens.be>
---
netcache.py | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/netcache.py b/netcache.py
index 7640f69..2d0743a 100644
--- a/netcache.py+++ b/netcache.py
@@ -986,12 +986,14 @@ def main():
descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\
By default, netcache will returns a cached version of a given URL, downloading it \
- only if not existing. A validity duration, in seconds, can also be given so that \- netcache downloads the content only if the existing cache is older than the validity."+ only if a cache version doesn't exist. A validity duration, in seconds, can also \+ be given so netcache downloads the content only if the existing cache is older than the validity." # Parse arguments
parser = argparse.ArgumentParser(prog="netcache",description=descri)
parser.add_argument("--path", action="store_true",
help="return path to the cache instead of the content of the cache")
+ parser.add_argument("--ids", action="store_true",+ help="return a list of id's for the gemini-site instead of the content of the cache") parser.add_argument("--offline", action="store_true",
help="Do not attempt to download, return cached version or error")
parser.add_argument("--max-size", type=int,
@@ -1008,7 +1010,7 @@ def main():
# --force-download : download and replace cache, even if valid
args = parser.parse_args()
param = {}
-+ for u in args.url:
if args.offline:
path = get_cache_path(u)
--
2.46.0
Re: [PATCH] Part two of the previous commit, adds the --ids option to netcache.
Thanks Bert,
It looks like the fault was on my side. When applying the patches
straight out my inbox, they fail. While downloading them on sourcehut
then applying them works.
The only difference seems that, when downloaded, the file seems to have
CRLF line terminators which is not the case in my inbox.
I don’t know:
- Why there’s such a difference
- If this is what was causing the failure (if yes, why failing only on
one section)
Anyway, this means that I’ve managed to apply your patches!
On 24 aoû 08 01:24, Bert Livens wrote:
>So, this is the part that doesn't want to apply. In the hope it is the problem, I replaced a directed ' with a straight '.>If that doesn't help, it is probably easiest to just copy and paste the code.>(The symbol does appear a few other times in the codebase but maybe those never passed through git send-email.)>>Signed-off-by: Bert Livens <bert@bertlivens.be>>---> netcache.py | 8 +++++---> 1 file changed, 5 insertions(+), 3 deletions(-)>>diff --git a/netcache.py b/netcache.py>index 7640f69..2d0743a 100644>--- a/netcache.py>+++ b/netcache.py>@@ -986,12 +986,14 @@ def main():>> descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\> By default, netcache will returns a cached version of a given URL, downloading it \>- only if not existing. A validity duration, in seconds, can also be given so that \>- netcache downloads the content only if the existing cache is older than the validity.">+ only if a cache version doesn't exist. A validity duration, in seconds, can also \>+ be given so netcache downloads the content only if the existing cache is older than the validity."> # Parse arguments> parser = argparse.ArgumentParser(prog="netcache",description=descri)> parser.add_argument("--path", action="store_true",> help="return path to the cache instead of the content of the cache")>+ parser.add_argument("--ids", action="store_true",>+ help="return a list of id's for the gemini-site instead of the content of the cache")> parser.add_argument("--offline", action="store_true",> help="Do not attempt to download, return cached version or error")> parser.add_argument("--max-size", type=int,>@@ -1008,7 +1010,7 @@ def main():> # --force-download : download and replace cache, even if valid> args = parser.parse_args()> param = {}>->+> for u in args.url:> if args.offline:> path = get_cache_path(u)>-->2.46.0>>
--
Ploum - Lionel Dricot
Blog: https://www.ploum.net
Livres: https://ploum.net/livres.html