`ruff` is now the de facto standard for code formatting in the Python world. Sending atomic patches to offpunk is pretty hard for me as I automatically apply `ruff` formatting to all the files and it creates unrelated diffs. This patch applies `ruff` formatting to all the files and should contain no regression. We may want at some point to enforce `ruff` formatting to avoid differences between contributors, but this is out of the scope of this patch. I will base my future work (especially the unmerdify integration) on this patch. Vincent Jousse (1): chore: apply ruff formatting ansicat.py | 877 +++++++++++++++++++++---------------- cert_migration.py | 5 +- netcache.py | 646 ++++++++++++++++----------- netcache_migration.py | 7 +- offpunk.py | 18 +- offutils.py | 174 ++++---- opnk.py | 184 +++++--- pyproject.toml | 2 +- tests/geminiclient_test.py | 2 +- tutorial/make_website.py | 2 +- 10 files changed, 1120 insertions(+), 797 deletions(-) -- 2.48.1
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~lioploum/offpunk-devel/patches/57218/mbox | git am -3Learn more about email & git
--- ansicat.py | 877 +++++++++++++++++++++---------------- cert_migration.py | 5 +- netcache.py | 646 ++++++++++++++++----------- netcache_migration.py | 7 +- offpunk.py | 18 +- offutils.py | 174 ++++---- opnk.py | 184 +++++--- pyproject.toml | 2 +- tests/geminiclient_test.py | 2 +- tutorial/make_website.py | 2 +- 10 files changed, 1120 insertions(+), 797 deletions(-) diff --git a/ansicat.py b/ansicat.py index 7982547..dfbd30a 100755 --- a/ansicat.py +++ b/ansicat.py @@ -1,31 +1,33 @@ #!/usr/bin/env python3 +import argparse +import base64 +import fnmatch +import html +import mimetypes import os -import sys import shutil import subprocess +import sys import textwrap import time -import html import urllib -import argparse -import mimetypes -import fnmatch + import netcache import offthemes -from offutils import run,term_width,is_local,looks_like_base64, looks_like_url -import base64 -from offutils import xdg +from offutils import is_local, looks_like_base64, looks_like_url, run, term_width, xdg + try: from readability import Document + _HAS_READABILITY = True except ModuleNotFoundError: _HAS_READABILITY = False try: - from bs4 import BeautifulSoup - from bs4 import Comment - #if bs4 version >= 4.11, we need to silent some xml warnings + # if bs4 version >= 4.11, we need to silent some xml warnings import bs4 + from bs4 import BeautifulSoup, Comment + version = bs4.__version__.split(".") recent = False if int(version[0]) > 4: @@ -36,28 +38,31 @@ try: # As this is only for silencing some warnings, we fail # silently. We don’t really care try: - from bs4 import XMLParsedAsHTMLWarning import warnings + + from bs4 import XMLParsedAsHTMLWarning + warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) - except: + except Exception: pass _HAS_SOUP = True except ModuleNotFoundError: _HAS_SOUP = False -_DO_HTML = _HAS_SOUP #and _HAS_READABILITY +_DO_HTML = _HAS_SOUP # and _HAS_READABILITY if _DO_HTML and not _HAS_READABILITY: print("To improve your web experience (less cruft in webpages),") print("please install python3-readability or readability-lxml") try: import feedparser + _DO_FEED = True except ModuleNotFoundError: _DO_FEED = False -_HAS_TIMG = shutil.which('timg') -_HAS_CHAFA = shutil.which('chafa') +_HAS_TIMG = shutil.which("timg") +_HAS_CHAFA = shutil.which("chafa") _NEW_CHAFA = False _NEW_TIMG = False _RENDER_IMAGE = False @@ -74,11 +79,11 @@ if _HAS_CHAFA: chafa_major, chafa_minor, _ = output.split("\n")[0].split(" ")[-1].split(".") if int(chafa_major) >= 1 and int(chafa_minor) >= 10: _NEW_CHAFA = True - except: + except Exception: pass -if _NEW_CHAFA : +if _NEW_CHAFA: _RENDER_IMAGE = True -if _HAS_TIMG : +if _HAS_TIMG: try: output = run("timg --version") except subprocess.CalledProcessError: @@ -90,6 +95,7 @@ if _HAS_TIMG : elif _HAS_CHAFA and not _NEW_CHAFA: try: from PIL import Image + _HAS_PIL = True _RENDER_IMAGE = True except ModuleNotFoundError: @@ -100,47 +106,49 @@ if not _RENDER_IMAGE: print("Before Chafa 1.10, you also need python-pil") - -#return ANSI text that can be show by less -def inline_image(img_file,width): - #We don’t even try displaying pictures that are not there +# return ANSI text that can be show by less +def inline_image(img_file, width): + # We don’t even try displaying pictures that are not there if not os.path.exists(img_file): return "" - #Chafa is faster than timg inline. Let use that one by default - #But we keep a list of "inlines" in case chafa fails + # Chafa is faster than timg inline. Let use that one by default + # But we keep a list of "inlines" in case chafa fails inlines = [] ansi_img = "" - #We avoid errors by not trying to render non-image files + # We avoid errors by not trying to render non-image files if shutil.which("file"): mime = run("file -b --mime-type %s", parameter=img_file).strip() - if not "image" in mime: + if "image" not in mime: return ansi_img if _HAS_CHAFA: if _HAS_PIL and not _NEW_CHAFA: # this code is a hack to remove frames from animated gif print("WARNING: support for chafa < 1.10 will soon be removed") - print("If you can’t upgrade chafa or timg, please contact offpunk developers") + print( + "If you can’t upgrade chafa or timg, please contact offpunk developers" + ) img_obj = Image.open(img_file) - if hasattr(img_obj,"n_frames") and img_obj.n_frames > 1: + if hasattr(img_obj, "n_frames") and img_obj.n_frames > 1: # we remove all frames but the first one - img_obj.save(img_file,format="gif",save_all=False) + img_obj.save(img_file, format="gif", save_all=False) inlines.append("chafa --bg white -s %s -f symbols") elif _NEW_CHAFA: inlines.append("chafa --bg white -t 1 -s %s -f symbols --animate=off") if _NEW_TIMG: inlines.append("timg --frames=1 -p q -g %sx1000") image_success = False - while not image_success and len(inlines)>0: - cmd = inlines.pop(0)%width + " %s" + while not image_success and len(inlines) > 0: + cmd = inlines.pop(0) % width + " %s" try: ansi_img = run(cmd, parameter=img_file) image_success = True except Exception as err: - ansi_img = "***IMAGE ERROR***\n%s…\n…%s" %(str(err)[:50],str(err)[-50:]) + ansi_img = "***IMAGE ERROR***\n%s…\n…%s" % (str(err)[:50], str(err)[-50:]) return ansi_img + def terminal_image(img_file): - #Render by timg is better than old chafa. + # Render by timg is better than old chafa. # it is also centered cmds = [] if _NEW_CHAFA: @@ -161,11 +169,11 @@ def terminal_image(img_file): # First, we define the different content->text renderers, outside of the rest # (They could later be factorized in other files or replaced) -class AbstractRenderer(): - def __init__(self,content,url,center=True): +class AbstractRenderer: + def __init__(self, content, url, center=True): self.url = url self.body = str(content) - #there’s one rendered text and one links table per mode + # there’s one rendered text and one links table per mode self.rendered_text = {} self.links = {} self.images = {} @@ -176,7 +184,7 @@ class AbstractRenderer(): self.last_mode = "readable" self.theme = offthemes.default - def display(self,mode=None,directdisplay=False): + def display(self, mode=None, directdisplay=False): wtitle = self.get_formatted_title() if mode == "source": body = self.body @@ -191,17 +199,17 @@ class AbstractRenderer(): def has_direct_display(self): return False - def set_theme(self,theme): + def set_theme(self, theme): if theme: self.theme.update(theme) def get_theme(self): return self.theme - #This class hold an internal representation of the HTML text + # This class hold an internal representation of the HTML text class representation: - def __init__(self,width,title=None,center=True,theme={}): - self.title=title + def __init__(self, width, title=None, center=True, theme={}): + self.title = title self.center = center self.final_text = "" self.opened = [] @@ -219,63 +227,64 @@ class AbstractRenderer(): self.theme = theme self.colors = offthemes.colors - def _insert(self,color,open=True): - if open: o = 0 - else: o = 1 + def _insert(self, color, open=True): + if open: + o = 0 + else: + o = 1 pos = len(self.last_line) - #we remember the position where to insert color codes - if not pos in self.last_line_colors: + # we remember the position where to insert color codes + if pos not in self.last_line_colors: self.last_line_colors[pos] = [] - #Two inverse code cancel each other - if [color,int(not o)] in self.last_line_colors[pos]: - self.last_line_colors[pos].remove([color,int(not o)]) + # Two inverse code cancel each other + if [color, int(not o)] in self.last_line_colors[pos]: + self.last_line_colors[pos].remove([color, int(not o)]) else: - self.last_line_colors[pos].append([color,o])#+color+str(o)) + self.last_line_colors[pos].append([color, o]) # +color+str(o)) # Take self.last line and add ANSI codes to it before adding it to # self.final_text. def _endline(self): if len(self.last_line.strip()) > 0: for c in self.opened: - self._insert(c,open=False) + self._insert(c, open=False) nextline = "" added_char = 0 - #we insert the color code at the saved positions - while len (self.last_line_colors) > 0: - pos,colors = self.last_line_colors.popitem() - #popitem itterates LIFO. - #So we go, backward, to the pos (starting at the end of last_line) + # we insert the color code at the saved positions + while len(self.last_line_colors) > 0: + pos, colors = self.last_line_colors.popitem() + # popitem itterates LIFO. + # So we go, backward, to the pos (starting at the end of last_line) nextline = self.last_line[pos:] + nextline ansicol = "\x1b[" - for c,o in colors: + for c, o in colors: ansicol += self.colors[c][o] + ";" - ansicol = ansicol[:-1]+"m" + ansicol = ansicol[:-1] + "m" nextline = ansicol + nextline added_char += len(ansicol) self.last_line = self.last_line[:pos] nextline = self.last_line + nextline if self.last_line_center: - #we have to care about the ansi char while centering + # we have to care about the ansi char while centering width = term_width() + added_char nextline = nextline.strip().center(width) self.last_line_center = False else: - #should we lstrip the nextline in the addition ? + # should we lstrip the nextline in the addition ? nextline = self.current_indent + nextline.lstrip() + self.r_indent self.current_indent = self.s_indent self.final_text += nextline self.last_line = "" self.final_text += "\n" for c in self.opened: - self._insert(c,open=True) + self._insert(c, open=True) else: self.last_line = "" - def center_line(self): self.last_line_center = True - def open_theme(self,element): + def open_theme(self, element): if element in self.theme: colors = self.theme[element] for c in colors: @@ -283,25 +292,29 @@ class AbstractRenderer(): return True else: return False - def close_theme(self,element): + + def close_theme(self, element): if element in self.theme: colors = self.theme[element] for c in colors: self.close_color(c) - def open_color(self,color): + + def open_color(self, color): if color in self.colors and color not in self.opened: - self._insert(color,open=True) + self._insert(color, open=True) self.opened.append(color) - def close_color(self,color): + + def close_color(self, color): if color in self.colors and color in self.opened: - self._insert(color,open=False) + self._insert(color, open=False) self.opened.remove(color) + def close_all(self): if len(self.colors) > 0: self.last_line += "\x1b[0m" self.opened.clear() - def startindent(self,indent,sub=None,reverse=None): + def startindent(self, indent, sub=None, reverse=None): self._endline() self.i_indent = indent self.current_indent = indent @@ -314,7 +327,6 @@ class AbstractRenderer(): else: self.r_indent = "" - def endindent(self): self._endline() self.i_indent = "" @@ -341,11 +353,11 @@ class AbstractRenderer(): def newline(self): self._endline() - #A new paragraph implies 2 newlines (1 blank line between paragraphs) - #But it is only used if didn’t already started one to avoid plenty - #of blank lines. force=True allows to bypass that limit. - #new_paragraph becomes false as soon as text is entered into it - def newparagraph(self,force=False): + # A new paragraph implies 2 newlines (1 blank line between paragraphs) + # But it is only used if didn’t already started one to avoid plenty + # of blank lines. force=True allows to bypass that limit. + # new_paragraph becomes false as soon as text is entered into it + def newparagraph(self, force=False): if force or not self.new_paragraph: self._endline() self.final_text += "\n" @@ -355,7 +367,7 @@ class AbstractRenderer(): if len(self.last_line) > 0 and self.last_line[-1] != " ": self.last_line += " " - def _title_first(self,intext=None): + def _title_first(self, intext=None): if self.title: if not self.title == intext: self._disable_indents() @@ -368,14 +380,14 @@ class AbstractRenderer(): # Beware, blocks are not wrapped nor indented and left untouched! # They are mostly useful for pictures and preformatted text. - def add_block(self,intext,theme=None): + def add_block(self, intext, theme=None): # If necessary, we add the title before a block self._title_first() # we don’t want to indent blocks self._endline() self._disable_indents() - #we have to apply the theme for every line in the intext - #applying theme to preformatted is controversial as it could change it + # we have to apply the theme for every line in the intext + # applying theme to preformatted is controversial as it could change it if theme: block = "" lines = intext.split("\n") @@ -385,17 +397,17 @@ class AbstractRenderer(): self.close_theme(theme) self._endline() self.last_line += "\n" - #one thing is sure : we need to keep unthemed blocks for images! + # one thing is sure : we need to keep unthemed blocks for images! else: self.final_text += self.current_indent + intext self.new_paragraph = False self._endline() self._enable_indents() - def add_text(self,intext): + def add_text(self, intext): self._title_first(intext=intext) lines = [] - last = (self.last_line + intext) + last = self.last_line + intext self.last_line = "" # With the following, we basically cancel adding only spaces # on an empty line @@ -407,52 +419,56 @@ class AbstractRenderer(): width = self.width - len(self.current_indent) - len(self.r_indent) spaces_left = len(last) - len(last.lstrip()) spaces_right = len(last) - len(last.rstrip()) - lines = textwrap.wrap(last,width,drop_whitespace=True) - self.last_line += spaces_left*" " + lines = textwrap.wrap(last, width, drop_whitespace=True) + self.last_line += spaces_left * " " while len(lines) > 1: l = lines.pop(0) self.last_line += l self._endline() if len(lines) == 1: li = lines[0] - self.last_line += li + spaces_right*" " + self.last_line += li + spaces_right * " " else: self.last_line = last def get_final(self): self.close_all() self._endline() - #if no content, we still add the title + # if no content, we still add the title self._title_first() lines = self.final_text.splitlines() lines2 = [] termspace = shutil.get_terminal_size()[0] - #Following code instert blanck spaces to center the content + # Following code instert blanck spaces to center the content if self.center and termspace > term_width(): - margin = int((termspace - term_width())//2) + margin = int((termspace - term_width()) // 2) else: margin = 0 - for l in lines : - lines2.append(margin*" "+l) + for l in lines: + lines2.append(margin * " " + l) return "\n".join(lines2) def get_subscribe_links(self): - return [[self.url,self.get_mime(),self.get_title()]] + return [[self.url, self.get_mime(), self.get_title()]] + def is_valid(self): return self.validity - def set_mode(self,mode): + + def set_mode(self, mode): self.last_mode = mode + def get_mode(self): return self.last_mode - def get_link(self,nb): + + def get_link(self, nb): links = self.get_links() if len(links) < nb: - print("Index too high! No link %s for %s" %(nb,self.url)) + print("Index too high! No link %s for %s" % (nb, self.url)) return None else: - return links[nb-1] + return links[nb - 1] - #get_title is about the "content title", so the title in the page itself + # get_title is about the "content title", so the title in the page itself def get_title(self): return "Abstract title" @@ -461,27 +477,29 @@ class AbstractRenderer(): if not title or len(title) == 0: title = self.get_url_title() else: - title += " (%s)" %self.get_url_title() + title += " (%s)" % self.get_url_title() return title - + def get_formatted_title(self): title = self.get_url_title() nbr = len(self.get_links()) if is_local(self.url): - title += " (%s items)"%nbr + title += " (%s items)" % nbr str_last = "local file" else: - str_last = "last accessed on %s"\ - %time.ctime(netcache.cache_last_modified(self.url)) - title += " (%s links)"%nbr - return self._window_title(title,info=str_last) + str_last = "last accessed on %s" % time.ctime( + netcache.cache_last_modified(self.url) + ) + title += " (%s links)" % nbr + return self._window_title(title, info=str_last) - #this function is about creating a title derived from the URL + # this function is about creating a title derived from the URL def get_url_title(self): - #small intelligence to try to find a good name for a capsule - #we try to find eithe ~username or /users/username - #else we fallback to hostname - if not self.url: return "" + # small intelligence to try to find a good name for a capsule + # we try to find eithe ~username or /users/username + # else we fallback to hostname + if not self.url: + return "" if is_local(self.url): splitpath = self.url.split("/") filename = splitpath[-1] @@ -492,9 +510,9 @@ class AbstractRenderer(): if "user" in path: i = 0 splitted = path.split("/") - while i < (len(splitted)-1): + while i < (len(splitted) - 1): if splitted[i].startswith("user"): - red_title = splitted[i+1] + red_title = splitted[i + 1] i += 1 if "~" in path: for pp in path.split("/"): @@ -504,9 +522,10 @@ class AbstractRenderer(): # This function return a list of URL which should be downloaded # before displaying the page (images in HTML pages, typically) - def get_images(self,mode=None): - if not mode: mode = self.last_mode - if not mode in self.images: + def get_images(self, mode=None): + if not mode: + mode = self.last_mode + if mode not in self.images: self.get_body(mode=mode) # we also invalidate the body that was done without images self.rendered_text.pop(mode) @@ -514,56 +533,63 @@ class AbstractRenderer(): return self.images[mode] else: return [] - #This function will give gemtext to the gemtext renderer - def prepare(self,body,mode=None): - return [[body,None]] - def _build_body_and_links(self,mode,width=None): + # This function will give gemtext to the gemtext renderer + def prepare(self, body, mode=None): + return [[body, None]] + + def _build_body_and_links(self, mode, width=None): if not width: width = term_width() - prepared_bodies = self.prepare(self.body,mode=mode) + prepared_bodies = self.prepare(self.body, mode=mode) self.rendered_text[mode] = "" self.links[mode] = [] for b in prepared_bodies: results = None size = len(self.links[mode]) if b[1] in _FORMAT_RENDERERS: - r = _FORMAT_RENDERERS[b[1]](b[0],self.url,center=self.center) - results = r.render(b[0],width=width,mode=mode,startlinks=size) + r = _FORMAT_RENDERERS[b[1]](b[0], self.url, center=self.center) + results = r.render(b[0], width=width, mode=mode, startlinks=size) else: - results = self.render(b[0],width=width,mode=mode,startlinks=size) + results = self.render(b[0], width=width, mode=mode, startlinks=size) if results: self.rendered_text[mode] += results[0] + "\n" - #we should absolutize all URLs here + # we should absolutize all URLs here for l in results[1]: ll = l.split()[0] try: - abs_l = urllib.parse.urljoin(self.url,ll) - self.links[mode].append(abs_l) - except Exception as err: - print("Urljoin Error: Could not make an URL out of %s and %s"%(self.url,ll)) + abs_l = urllib.parse.urljoin(self.url, ll) + self.links[mode].append(abs_l) + except Exception: + print( + "Urljoin Error: Could not make an URL out of %s and %s" + % (self.url, ll) + ) for l in self.get_subscribe_links()[1:]: self.links[mode].append(l[0]) - def get_body(self,width=None,mode=None): - if not mode: mode = self.last_mode + def get_body(self, width=None, mode=None): + if not mode: + mode = self.last_mode if mode not in self.rendered_text: - self._build_body_and_links(mode,width) + self._build_body_and_links(mode, width) return self.rendered_text[mode] - def get_links(self,mode=None): - if not mode: mode = self.last_mode - if mode not in self.links : + + def get_links(self, mode=None): + if not mode: + mode = self.last_mode + if mode not in self.links: self._build_body_and_links(mode) return self.links[mode] - def _window_title(self,title,info=None): - title_r = self.representation(term_width(),theme=self.theme) + def _window_title(self, title, info=None): + title_r = self.representation(term_width(), theme=self.theme) title_r.open_theme("window_title") title_r.add_text(title) title_r.close_theme("window_title") if info: title_r.open_theme("window_subtitle") - title_r.add_text(" (%s)"%info) + title_r.add_text(" (%s)" % info) title_r.close_theme("window_subtitle") return title_r.get_final() @@ -575,9 +601,11 @@ class AbstractRenderer(): # The prepare() function output a list of tuple. Each tuple is [output text, format] where # format should be in _FORMAT_RENDERERS. If None, current renderer is used + class PlaintextRenderer(AbstractRenderer): def get_mime(self): return "text/plain" + def get_title(self): if self.title: return self.title @@ -596,13 +624,16 @@ class PlaintextRenderer(AbstractRenderer): return self.title else: return "(unknown)" - def render(self,gemtext, width=None,mode=None,startlinks=0): + + def render(self, gemtext, width=None, mode=None, startlinks=0): return gemtext, [] + # Gemtext Rendering Engine class GemtextRenderer(AbstractRenderer): def get_mime(self): return "text/gemini" + def get_title(self): if self.title: return self.title @@ -626,18 +657,19 @@ class GemtextRenderer(AbstractRenderer): else: return "(unknown)" - #render_gemtext - def render(self,gemtext, width=None,mode=None,startlinks=0): + # render_gemtext + def render(self, gemtext, width=None, mode=None, startlinks=0): if not width: width = term_width() - r = self.representation(width,theme=self.theme) + r = self.representation(width, theme=self.theme) links = [] hidden_links = [] preformatted = False - def format_link(url,index,name=None): + + def format_link(url, index, name=None): if "://" in url: - protocol,adress = url.split("://",maxsplit=1) - protocol = " %s" %protocol + protocol, adress = url.split("://", maxsplit=1) + protocol = " %s" % protocol else: adress = url protocol = "" @@ -647,6 +679,7 @@ class GemtextRenderer(AbstractRenderer): name = adress line = "[%d%s] %s" % (index, protocol, name) return line + for line in gemtext.splitlines(): r.newline() if line.startswith("```"): @@ -657,7 +690,7 @@ class GemtextRenderer(AbstractRenderer): r.close_theme("preformatted") elif preformatted: # infinite line to not wrap preformated - r.add_block(line+"\n",theme="preformatted") + r.add_block(line + "\n", theme="preformatted") elif len(line.strip()) == 0: r.newparagraph(force=True) elif line.startswith("=>"): @@ -669,13 +702,17 @@ class GemtextRenderer(AbstractRenderer): name = None if len(splitted) > 1: name = splitted[1] - link = format_link(url,len(links)+startlinks,name=name) + link = format_link(url, len(links) + startlinks, name=name) # If the link point to a page that has been cached less than # 600 seconds after this page, we consider it as a new_link current_modif = netcache.cache_last_modified(self.url) link_modif = netcache.cache_last_modified(url) - if current_modif and link_modif and current_modif - link_modif < 600 and\ - r.open_theme("new_link"): + if ( + current_modif + and link_modif + and current_modif - link_modif < 600 + and r.open_theme("new_link") + ): theme = "new_link" elif r.open_theme("oneline_link"): theme = "oneline_link" @@ -683,13 +720,13 @@ class GemtextRenderer(AbstractRenderer): theme = "link" r.open_theme("link") startpos = link.find("] ") + 2 - r.startindent("",sub=startpos*" ") + r.startindent("", sub=startpos * " ") r.add_text(link) r.close_theme(theme) r.endindent() elif line.startswith("* "): line = line[1:].lstrip("\t ") - r.startindent("• ",sub=" ") + r.startindent("• ", sub=" ") r.add_text(line) r.endindent() elif line.startswith(">"): @@ -730,16 +767,20 @@ class GemtextRenderer(AbstractRenderer): links += hidden_links return r.get_final(), links + class EmptyRenderer(GemtextRenderer): def get_mime(self): return "text/empty" - def prepare(self,body,mode=None): - text= "(empty file)" + + def prepare(self, body, mode=None): + text = "(empty file)" return [[text, "GemtextRenderer"]] + class GopherRenderer(AbstractRenderer): def get_mime(self): return "text/gopher" + def get_title(self): if not self.title: self.title = "" @@ -751,26 +792,28 @@ class GopherRenderer(AbstractRenderer): self.title = firstline return self.title - #menu_or_text - def render(self,body,width=None,mode=None,startlinks=0): + # menu_or_text + def render(self, body, width=None, mode=None, startlinks=0): if not width: width = term_width() try: - render,links = self._render_goph(body,width=width,mode=mode,startlinks=startlinks) + render, links = self._render_goph( + body, width=width, mode=mode, startlinks=startlinks + ) except Exception as err: - print("Error rendering Gopher ",err) - r = self.representation(width,theme=self.theme) + print("Error rendering Gopher ", err) + r = self.representation(width, theme=self.theme) r.add_block(body) render = r.get_final() links = [] - return render,links + return render, links - def _render_goph(self,body,width=None,mode=None,startlinks=0): + def _render_goph(self, body, width=None, mode=None, startlinks=0): if not width: width = term_width() # This was copied straight from Agena (then later adapted) links = [] - r = self.representation(width,theme=self.theme) + r = self.representation(width, theme=self.theme) for line in self.body.split("\n"): r.newline() if line.startswith("i"): @@ -779,49 +822,50 @@ class GopherRenderer(AbstractRenderer): r.add_text(towrap) else: r.newparagraph() - elif not line.strip() in [".",""]: + elif line.strip() not in [".", ""]: parts = line.split("\t") parts[-1] = parts[-1].strip() if parts[-1] == "+": parts = parts[:-1] if len(parts) == 4: - name,path,host,port = parts - #If line starts with TAB, there’s no name. - #We thus hide this line + name, path, host, port = parts + # If line starts with TAB, there’s no name. + # We thus hide this line if name: itemtype = name[0] name = name[1:] if port == "70": port = "" else: - port = ":%s"%port + port = ":%s" % port if itemtype == "h" and path.startswith("URL:"): url = path[4:] else: - url = "gopher://%s%s/%s%s" %(host,port,itemtype,path) - url = url.replace(" ","%20") + url = "gopher://%s%s/%s%s" % (host, port, itemtype, path) + url = url.replace(" ", "%20") linkline = url + " " + name links.append(linkline) number = len(links) + startlinks - towrap = "[%s] "%str(number)+ name + towrap = "[%s] " % str(number) + name r.add_text(towrap) else: r.add_text(line) - return r.get_final(),links + return r.get_final(), links class FolderRenderer(GemtextRenderer): - #it was initialized with: - #self.renderer = FolderRenderer("",self.get_cache_path(),datadir=xdg("data")) - def __init__(self,content,url,center=True,datadir=None): - GemtextRenderer.__init__(self,content,url,center) + # it was initialized with: + # self.renderer = FolderRenderer("",self.get_cache_path(),datadir=xdg("data")) + def __init__(self, content, url, center=True, datadir=None): + GemtextRenderer.__init__(self, content, url, center) self.datadir = datadir def get_mime(self): return "Directory" - def prepare(self,body,mode=None): + + def prepare(self, body, mode=None): def get_first_line(l): - path = os.path.join(listdir,l+".gmi") + path = os.path.join(listdir, l + ".gmi") with open(path) as f: first_line = f.readline().strip() f.close() @@ -829,22 +873,24 @@ class FolderRenderer(GemtextRenderer): return first_line else: return None + def write_list(l): body = "" for li in l: - path = "list:///%s"%li + path = "list:///%s" % li r = renderer_from_file(netcache.get_cache_path(path)) size = len(r.get_links()) - body += "=> %s %s (%s items)\n" %(str(path),li,size) + body += "=> %s %s (%s items)\n" % (str(path), li, size) return body - listdir = os.path.join(self.datadir,"lists") + + listdir = os.path.join(self.datadir, "lists") self.title = "My lists" lists = [] if os.path.exists(listdir): listfiles = os.listdir(listdir) if len(listfiles) > 0: for l in listfiles: - #removing the .gmi at the end of the name + # removing the .gmi at the end of the name lists.append(l[:-4]) if len(lists) > 0: body = "" @@ -854,7 +900,7 @@ class FolderRenderer(GemtextRenderer): frozen = [] lists.sort() for l in lists: - if l in ["history","to_fetch","archives","tour"]: + if l in ["history", "to_fetch", "archives", "tour"]: system_lists.append(l) else: first_line = get_first_line(l) @@ -865,27 +911,29 @@ class FolderRenderer(GemtextRenderer): else: my_lists.append(l) if len(my_lists) > 0: - body+= "\n## Bookmarks Lists (updated during sync)\n" + body += "\n## Bookmarks Lists (updated during sync)\n" body += write_list(my_lists) if len(subscriptions) > 0: - body +="\n## Subscriptions (new links in those are added to tour)\n" + body += "\n## Subscriptions (new links in those are added to tour)\n" body += write_list(subscriptions) if len(frozen) > 0: - body +="\n## Frozen (fetched but never updated)\n" + body += "\n## Frozen (fetched but never updated)\n" body += write_list(frozen) if len(system_lists) > 0: - body +="\n## System Lists\n" + body += "\n## System Lists\n" body += write_list(system_lists) - return [[body,None]] + return [[body, None]] + class FeedRenderer(GemtextRenderer): def get_mime(self): return "application/rss+xml" + def is_valid(self): if _DO_FEED: try: parsed = feedparser.parse(self.body) - except: + except Exception: parsed = False else: return False @@ -894,7 +942,7 @@ class FeedRenderer(GemtextRenderer): elif parsed.bozo: return False else: - #If no content, then fallback to HTML + # If no content, then fallback to HTML return len(parsed.entries) > 0 def get_title(self): @@ -902,8 +950,9 @@ class FeedRenderer(GemtextRenderer): self.get_body() return self.title - def prepare(self,content,mode=None,width=None): - if not mode: mode = self.last_mode + def prepare(self, content, mode=None, width=None): + if not mode: + mode = self.last_mode if not width: width = term_width() self.title = "RSS/Atom feed" @@ -924,92 +973,97 @@ class FeedRenderer(GemtextRenderer): t = parsed.feed.title else: t = "Unknown" - self.title = "%s (XML feed)" %t - title = "# %s"%self.title + self.title = "%s (XML feed)" % t + title = "# %s" % self.title page += title + "\n" if "updated" in parsed.feed: - page += "Last updated on %s\n\n" %parsed.feed.updated + page += "Last updated on %s\n\n" % parsed.feed.updated if "subtitle" in parsed.feed: page += parsed.feed.subtitle + "\n" if "link" in parsed.feed: - page += "=> %s\n" %parsed.feed.link + page += "=> %s\n" % parsed.feed.link page += "\n## Entries\n" - toreturn.append([page,None]) + toreturn.append([page, None]) if len(parsed.entries) < 1: self.validity = False postslist = "" for i in parsed.entries: if "link" in i: - line = "=> %s " %i.link + line = "=> %s " % i.link elif "links" in i and len(i.links) > 0: link = None j = 0 while not link and j < len(i.links): link = i.links[j].href if link: - line = "=> %s "%link + line = "=> %s " % link else: line = "* " else: line = "* " if "published" in i: - #sometimes fails so protect it + # sometimes fails so protect it try: - pub_date = time.strftime("%Y-%m-%d",i.published_parsed) + pub_date = time.strftime("%Y-%m-%d", i.published_parsed) line += pub_date + " : " - except: + except Exception: pass if "title" in i: - line += "%s" %(i.title) + line += "%s" % (i.title) if "author" in i: - line += " (by %s)"%i.author + line += " (by %s)" % i.author if mode == "full": - toreturn.append([line,None]) + toreturn.append([line, None]) if "summary" in i: - toreturn.append([i.summary,"text/html"]) - toreturn.append(["------------",None]) + toreturn.append([i.summary, "text/html"]) + toreturn.append(["------------", None]) else: postslist += line + "\n" - #If each posts is append to toreturn, a \n is inserted - #between each item of the list. I don’t like it. Hence this hack + # If each posts is append to toreturn, a \n is inserted + # between each item of the list. I don’t like it. Hence this hack if mode != "full": - toreturn.append([postslist,None]) + toreturn.append([postslist, None]) return toreturn + class ImageRenderer(AbstractRenderer): def get_mime(self): return "image/*" + def is_valid(self): if _RENDER_IMAGE: return True else: return False - def get_links(self,mode=None): + + def get_links(self, mode=None): return [] + def get_title(self): return "Picture file" - def render(self,img,width=None,mode=None,startlinks=0): - #with inline, we use symbols to be rendered with less. - #else we use the best possible renderer. - if mode in ["full_links_only","links_only"]: + + def render(self, img, width=None, mode=None, startlinks=0): + # with inline, we use symbols to be rendered with less. + # else we use the best possible renderer. + if mode in ["full_links_only", "links_only"]: return "", [] if not width: width = term_width() spaces = 0 else: - spaces = int((term_width() - width)//2) - ansi_img = inline_image(img,width) - #Now centering the image + spaces = int((term_width() - width) // 2) + ansi_img = inline_image(img, width) + # Now centering the image lines = ansi_img.splitlines() new_img = "" for l in lines: - new_img += spaces*" " + l + "\n" + new_img += spaces * " " + l + "\n" return new_img, [] def has_direct_display(self): return _RENDER_IMAGE - def display(self,mode=None,directdisplay=False): + def display(self, mode=None, directdisplay=False): wtitle = self.get_formatted_title() if not directdisplay: body = wtitle + "\n" + self.get_body(mode=mode) @@ -1019,24 +1073,29 @@ class ImageRenderer(AbstractRenderer): terminal_image(self.body) return True + class HtmlRenderer(AbstractRenderer): def get_mime(self): return "text/html" + def is_valid(self): if not _DO_HTML: - print("HTML document detected. Please install python-bs4 and python-readability.") + print( + "HTML document detected. Please install python-bs4 and python-readability." + ) return _DO_HTML and self.validity + def get_subscribe_links(self): - subs = [[self.url,self.get_mime(),self.get_title()]] - soup = BeautifulSoup(self.body, 'html.parser') - links = soup.find_all("link",rel="alternate",recursive=True) + subs = [[self.url, self.get_mime(), self.get_title()]] + soup = BeautifulSoup(self.body, "html.parser") + links = soup.find_all("link", rel="alternate", recursive=True) for l in links: ty = l.get("type") - if ty : + if ty: if "rss" in ty or "atom" in ty or "feed" in ty: # some rss links are relatives: we absolutise_url sublink = urllib.parse.urljoin(self.url, l.get("href")) - subs.append([sublink,ty,l.get("title")]) + subs.append([sublink, ty, l.get("title")]) return subs def get_title(self): @@ -1048,9 +1107,9 @@ class HtmlRenderer(AbstractRenderer): readable = Document(self.body) self.title = readable.short_title() return self.title - except Exception as err: + except Exception: pass - soup = BeautifulSoup(self.body,"html.parser") + soup = BeautifulSoup(self.body, "html.parser") if soup.title: self.title = str(soup.title.string) else: @@ -1062,173 +1121,194 @@ class HtmlRenderer(AbstractRenderer): # Our own HTML engine (crazy, isn’t it?) # Return [rendered_body, list_of_links] # mode is either links_only, readable or full - def render(self,body,mode=None,width=None,add_title=True,startlinks=0): - if not mode: mode = self.last_mode + def render(self, body, mode=None, width=None, add_title=True, startlinks=0): + if not mode: + mode = self.last_mode if not width: width = term_width() if not _DO_HTML: - print("HTML document detected. Please install python-bs4 and python-readability.") + print( + "HTML document detected. Please install python-bs4 and python-readability." + ) return # This method recursively parse the HTML - r = self.representation(width,title=self.get_title(),center=self.center,theme=self.theme) + r = self.representation( + width, title=self.get_title(), center=self.center, theme=self.theme + ) links = [] # You know how bad html is when you realize that space sometimes meaningful, somtimes not. # CR are not meaniningful. Except that, somethimes, they should be interpreted as spaces. # HTML is real crap. At least the one people are generating. - def render_image(src,width=40,mode=None): + def render_image(src, width=40, mode=None): ansi_img = "" - imgurl,imgdata = looks_like_base64(src,self.url) - if _RENDER_IMAGE and mode not in ["full_links_only","links_only"] and imgurl: + imgurl, imgdata = looks_like_base64(src, self.url) + if ( + _RENDER_IMAGE + and mode not in ["full_links_only", "links_only"] + and imgurl + ): try: - #4 followings line are there to translate the URL into cache path + # 4 followings line are there to translate the URL into cache path img = netcache.get_cache_path(imgurl) if imgdata: os.makedirs(os.path.dirname(img), exist_ok=True) - with open(img,"wb") as cached: + with open(img, "wb") as cached: cached.write(base64.b64decode(imgdata)) cached.close() if netcache.is_cache_valid(img): - renderer = ImageRenderer(img,imgurl) + renderer = ImageRenderer(img, imgurl) # Image are 40px wide except if terminal is smaller if width > 40: size = 40 else: size = width - ansi_img = "\n" + renderer.get_body(width=size,mode="inline") + ansi_img = "\n" + renderer.get_body(width=size, mode="inline") except Exception as err: - #we sometimes encounter really bad formatted files or URL - ansi_img = textwrap.fill("[BAD IMG] %s - %s"%(err,src),width) + "\n" + # we sometimes encounter really bad formatted files or URL + ansi_img = ( + textwrap.fill("[BAD IMG] %s - %s" % (err, src), width) + "\n" + ) return ansi_img + def sanitize_string(string): - #never start with a "\n" - #string = string.lstrip("\n") - string = string.replace("\r","").replace("\n", " ").replace("\t"," ") + # never start with a "\n" + # string = string.lstrip("\n") + string = string.replace("\r", "").replace("\n", " ").replace("\t", " ") endspace = string.endswith(" ") or string.endswith("\xa0") startspace = string.startswith(" ") or string.startswith("\xa0") - toreturn = string.replace("\n", " ").replace("\t"," ").strip() + toreturn = string.replace("\n", " ").replace("\t", " ").strip() while " " in toreturn: - toreturn = toreturn.replace(" "," ") + toreturn = toreturn.replace(" ", " ") toreturn = html.unescape(toreturn) - if endspace and not toreturn.endswith(" ") and not toreturn.endswith("\xa0"): + if ( + endspace + and not toreturn.endswith(" ") + and not toreturn.endswith("\xa0") + ): toreturn += " " - if startspace and not toreturn.startswith(" ") and not toreturn.startswith("\xa0"): + if ( + startspace + and not toreturn.startswith(" ") + and not toreturn.startswith("\xa0") + ): toreturn = " " + toreturn return toreturn - def recursive_render(element,indent="",preformatted=False): + + def recursive_render(element, indent="", preformatted=False): if element.name in ["blockquote", "dd"]: r.newparagraph() - r.startindent(" ",reverse=" ") + r.startindent(" ", reverse=" ") for child in element.children: r.open_theme("blockquote") - recursive_render(child,indent="\t") + recursive_render(child, indent="\t") r.close_theme("blockquote") r.endindent() - elif element.name in ["div","p","dt"]: + elif element.name in ["div", "p", "dt"]: r.newparagraph() for child in element.children: - recursive_render(child,indent=indent) + recursive_render(child, indent=indent) r.newparagraph() elif element.name in ["span"]: r.add_space() for child in element.children: - recursive_render(child,indent=indent) + recursive_render(child, indent=indent) r.add_space() - elif element.name in ["h1","h2","h3","h4","h5","h6"]: + elif element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: if element.name in ["h1"]: r.open_theme("title") - elif element.name in ["h2","h3"]: + elif element.name in ["h2", "h3"]: r.open_theme("subtitle") - elif element.name in ["h4","h5","h6"]: + elif element.name in ["h4", "h5", "h6"]: if not r.open_theme("subsubtitle"): r.open_theme("subtitle") r.newparagraph() for child in element.children: recursive_render(child) - #r.close_all() + # r.close_all() r.close_all() r.newparagraph() - elif element.name in ["code","tt"]: + elif element.name in ["code", "tt"]: for child in element.children: - recursive_render(child,indent=indent,preformatted=True) + recursive_render(child, indent=indent, preformatted=True) elif element.name in ["pre"]: r.newparagraph() - r.add_block(element.text,theme="preformatted") + r.add_block(element.text, theme="preformatted") r.newparagraph(force=True) elif element.name in ["li"]: - r.startindent(" • ",sub=" ") + r.startindent(" • ", sub=" ") for child in element.children: - recursive_render(child,indent=indent) + recursive_render(child, indent=indent) r.endindent() elif element.name in ["tr"]: - r.startindent("|",reverse="|") + r.startindent("|", reverse="|") for child in element.children: - recursive_render(child,indent=indent) + recursive_render(child, indent=indent) r.endindent() - elif element.name in ["td","th"]: + elif element.name in ["td", "th"]: r.add_text("| ") for child in element.children: recursive_render(child) r.add_text(" |") # italics - elif element.name in ["em","i"]: + elif element.name in ["em", "i"]: r.open_color("italic") for child in element.children: - recursive_render(child,indent=indent,preformatted=preformatted) + recursive_render(child, indent=indent, preformatted=preformatted) r.close_color("italic") - #bold - elif element.name in ["b","strong"]: + # bold + elif element.name in ["b", "strong"]: r.open_color("bold") for child in element.children: - recursive_render(child,indent=indent,preformatted=preformatted) + recursive_render(child, indent=indent, preformatted=preformatted) r.close_color("bold") elif element.name == "a": - link = element.get('href') + link = element.get("href") # support for images nested in links if link: text = "" imgtext = "" - #we display images first in a link + # we display images first in a link for child in element.children: if child.name == "img": recursive_render(child) imgtext = "[IMG LINK %s]" - links.append(link+" "+text) - link_id = str(len(links)+startlinks) + links.append(link + " " + text) + link_id = str(len(links) + startlinks) r.open_theme("link") for child in element.children: if child.name != "img": - recursive_render(child,preformatted=preformatted) + recursive_render(child, preformatted=preformatted) if imgtext != "": r.center_line() - r.add_text(imgtext%link_id) + r.add_text(imgtext % link_id) else: - r.add_text(" [%s]"%link_id) + r.add_text(" [%s]" % link_id) r.close_theme("link") else: - #No real link found + # No real link found for child in element.children: - recursive_render(child,preformatted=preformatted) + recursive_render(child, preformatted=preformatted) elif element.name == "img": src = element.get("src") text = "" - ansi_img = render_image(src,width=width,mode=mode) + ansi_img = render_image(src, width=width, mode=mode) alt = element.get("alt") if alt: alt = sanitize_string(alt) - text += "[IMG] %s"%alt + text += "[IMG] %s" % alt else: text += "[IMG]" if src: - if not mode in self.images: + if mode not in self.images: self.images[mode] = [] - abs_url,data = looks_like_base64(src,self.url) - #if abs_url is None, it means we don’t support - #the image (such as svg+xml). So we hide it. + abs_url, data = looks_like_base64(src, self.url) + # if abs_url is None, it means we don’t support + # the image (such as svg+xml). So we hide it. if abs_url: - links.append(abs_url+" "+text) + links.append(abs_url + " " + text) self.images[mode].append(abs_url) - link_id = " [%s]"%(len(links)+startlinks) + link_id = " [%s]" % (len(links) + startlinks) r.add_block(ansi_img) r.open_theme("image_link") r.center_line() @@ -1245,36 +1325,36 @@ class HtmlRenderer(AbstractRenderer): src = child.get("src") text = "" if poster: - ansi_img = render_image(poster,width=width,mode=mode) + ansi_img = render_image(poster, width=width, mode=mode) alt = element.get("alt") if alt: alt = sanitize_string(alt) - text += "[VIDEO] %s"%alt + text += "[VIDEO] %s" % alt else: text += "[VIDEO]" if poster: - if not mode in self.images: + if mode not in self.images: self.images[mode] = [] - poster_url,d = looks_like_base64(poster,self.url) + poster_url, d = looks_like_base64(poster, self.url) if poster_url: - vid_url,d2 = looks_like_base64(src,self.url) + vid_url, d2 = looks_like_base64(src, self.url) self.images[mode].append(poster_url) r.add_block(ansi_img) r.open_theme("image_link") r.center_line() if vid_url and src: - links.append(vid_url+" "+text) - link_id = " [%s]"%(len(links)+startlinks) + links.append(vid_url + " " + text) + link_id = " [%s]" % (len(links) + startlinks) r.add_text(text + link_id) else: r.add_text(text) r.close_theme("image_link") r.newline() elif src: - vid_url,d = looks_like_base64(src,self.url) - links.append(vid_url+" "+text) - link_id = " [%s]"%(len(links)+startlinks) + vid_url, d = looks_like_base64(src, self.url) + links.append(vid_url + " " + text) + link_id = " [%s]" % (len(links) + startlinks) r.open_theme("image_link") r.center_line() r.add_text(text + link_id) @@ -1283,9 +1363,12 @@ class HtmlRenderer(AbstractRenderer): elif element.name == "br": r.newline() - elif element.name not in ["script","style","template"] and type(element) != Comment: + elif ( + element.name not in ["script", "style", "template"] + and type(element) is not Comment + ): if element.string: - if preformatted : + if preformatted: r.open_theme("preformatted") r.add_text(element.string) r.close_theme("preformatted") @@ -1295,54 +1378,57 @@ class HtmlRenderer(AbstractRenderer): r.add_text(s) else: for child in element.children: - recursive_render(child,indent=indent) + recursive_render(child, indent=indent) + # the real render_html hearth - if mode in ["full","full_links_only"]: + if mode in ["full", "full_links_only"]: summary = body elif _HAS_READABILITY: try: readable = Document(body) summary = readable.summary() - except Exception as err: + except Exception: summary = body else: summary = body - soup = BeautifulSoup(summary, 'html.parser') - #soup = BeautifulSoup(summary, 'html5lib') - if soup : - if soup.body : + soup = BeautifulSoup(summary, "html.parser") + # soup = BeautifulSoup(summary, 'html5lib') + if soup: + if soup.body: recursive_render(soup.body) else: recursive_render(soup) - return r.get_final(),links + return r.get_final(), links # Mapping mimetypes with renderers # (any content with a mimetype text/* not listed here will be rendered with as GemText) _FORMAT_RENDERERS = { - "text/gemini": GemtextRenderer, - "text/html" : HtmlRenderer, - "text/xml" : FeedRenderer, - "text/plain" : PlaintextRenderer, - "application/xml" : FeedRenderer, - "application/rss+xml" : FeedRenderer, - "application/atom+xml" : FeedRenderer, + "text/gemini": GemtextRenderer, + "text/html": HtmlRenderer, + "text/xml": FeedRenderer, + "text/plain": PlaintextRenderer, + "application/xml": FeedRenderer, + "application/rss+xml": FeedRenderer, + "application/atom+xml": FeedRenderer, "text/gopher": GopherRenderer, "image/*": ImageRenderer, "application/javascript": HtmlRenderer, "application/json": HtmlRenderer, "text/empty": EmptyRenderer, } -def get_mime(path,url=None): - #Beware, this one is really a shaddy ad-hoc function + + +def get_mime(path, url=None): + # Beware, this one is really a shaddy ad-hoc function if not path: return None - #If the file is empty, simply returns it + # If the file is empty, simply returns it elif os.path.exists(path) and os.stat(path).st_size == 0: return "text/empty" elif url and url.startswith("gopher://"): - #special case for gopher - #code copy/pasted from netcache + # special case for gopher + # code copy/pasted from netcache parsed = urllib.parse.urlparse(url) if len(parsed.path) >= 2: itemtype = parsed.path[1] @@ -1356,7 +1442,7 @@ def get_mime(path,url=None): mime = "text/gopher" elif itemtype == "h": mime = "text/html" - elif itemtype in ("9","g","I","s",";"): + elif itemtype in ("9", "g", "I", "s", ";"): mime = "binary" else: mime = "text/gopher" @@ -1368,11 +1454,11 @@ def get_mime(path,url=None): mime = "text/gemini" elif path.endswith("gophermap"): mime = "text/gopher" - elif shutil.which("file") : + elif shutil.which("file"): mime = run("file -b --mime-type %s", parameter=path).strip() - mime2,encoding = mimetypes.guess_type(path,strict=False) - #If we hesitate between html and xml, takes the xml one - #because the FeedRendered fallback to HtmlRenderer + mime2, encoding = mimetypes.guess_type(path, strict=False) + # If we hesitate between html and xml, takes the xml one + # because the FeedRendered fallback to HtmlRenderer if mime2 and mime != mime2 and "html" in mime and "xml" in mime2: mime = "text/xml" # If it’s a xml file, consider it as such, regardless of what file thinks @@ -1381,48 +1467,50 @@ def get_mime(path,url=None): # If it doesn’t end with .svg, it is probably an xml, not a SVG file elif "svg" in mime and not path.endswith(".svg"): mime = "text/xml" - #Some xml/html document are considered as octet-stream + # Some xml/html document are considered as octet-stream if mime == "application/octet-stream": mime = "text/xml" else: - mime,encoding = mimetypes.guess_type(path,strict=False) - #gmi Mimetype is not recognized yet - if not mime and not shutil.which("file") : - print("Cannot guess the mime type of the file. Please install \"file\".") + mime, encoding = mimetypes.guess_type(path, strict=False) + # gmi Mimetype is not recognized yet + if not mime and not shutil.which("file"): + print('Cannot guess the mime type of the file. Please install "file".') if mime.startswith("text") and mime not in _FORMAT_RENDERERS: if mime2 and mime2 in _FORMAT_RENDERERS: mime = mime2 else: - #by default, we consider it’s gemini except for html + # by default, we consider it’s gemini except for html mime = "text/gemini" - #file doesn’t recognise gemtext. It should be the default renderer. - #the only case were it doesn’t make sense is if the file is .txt + # file doesn’t recognise gemtext. It should be the default renderer. + # the only case were it doesn’t make sense is if the file is .txt if mime == "text/plain" and not path.endswith(".txt"): mime = "text/gemini" return mime -def renderer_from_file(path,url=None,theme=None): + +def renderer_from_file(path, url=None, theme=None): if not path: return None - mime = get_mime(path,url=url) + mime = get_mime(path, url=url) if not url: url = path if os.path.exists(path): if mime.startswith("text/") or mime in _FORMAT_RENDERERS: - with open(path,errors="ignore") as f: + with open(path, errors="ignore") as f: content = f.read() f.close() else: content = path - toreturn = set_renderer(content,url,mime,theme=theme) + toreturn = set_renderer(content, url, mime, theme=theme) else: toreturn = None return toreturn -def set_renderer(content,url,mime,theme=None): + +def set_renderer(content, url, mime, theme=None): renderer = None if mime == "Local Folder": - renderer = FolderRenderer("",url,datadir=xdg("data")) + renderer = FolderRenderer("", url, datadir=xdg("data")) if theme: renderer.set_theme(theme) return renderer @@ -1434,93 +1522,136 @@ def set_renderer(content,url,mime,theme=None): current_mime = mime_to_use[0] func = _FORMAT_RENDERERS[current_mime] if current_mime.startswith("text"): - renderer = func(content,url) + renderer = func(content, url) # We double check if the renderer is correct. # If not, we fallback to html # (this is currently only for XHTML, often being # mislabelled as xml thus RSS feeds) if not renderer.is_valid(): func = _FORMAT_RENDERERS["text/html"] - #print("Set (fallback)RENDERER to html instead of %s"%mime) - renderer = func(content,url) + # print("Set (fallback)RENDERER to html instead of %s"%mime) + renderer = func(content, url) else: - #TODO: check this code and then remove one if. - #we don’t parse text, we give the file to the renderer - renderer = func(content,url) + # TODO: check this code and then remove one if. + # we don’t parse text, we give the file to the renderer + renderer = func(content, url) if not renderer.is_valid(): renderer = None if renderer and theme: renderer.set_theme(theme) return renderer -def render(input,path=None,format="auto",mime=None,url=None,mode=None): - if not url: url = "" - else: url=url[0] + +def render(input, path=None, format="auto", mime=None, url=None, mode=None): + if not url: + url = "" + else: + url = url[0] if format == "gemtext": - r = GemtextRenderer(input,url) + r = GemtextRenderer(input, url) elif format == "html": - r = HtmlRenderer(input,url) + r = HtmlRenderer(input, url) elif format == "feed": - r = FeedRenderer(input,url) + r = FeedRenderer(input, url) elif format == "gopher": - r = GopherRenderer(input,url) + r = GopherRenderer(input, url) elif format == "image": - r = ImageRenderer(input,url) + r = ImageRenderer(input, url) elif format == "folder": - r = FolderRenderer(input,url) - elif format in ["plaintext","text"]: - r = PlaintextRenderer(input,url) + r = FolderRenderer(input, url) + elif format in ["plaintext", "text"]: + r = PlaintextRenderer(input, url) else: if not mime and path: - r= renderer_from_file(path,url) + r = renderer_from_file(path, url) else: - r = set_renderer(input,url,mime) + r = set_renderer(input, url, mime) if r: - r.display(directdisplay=True,mode=mode) + r.display(directdisplay=True, mode=mode) else: - print("Could not render %s"%input) + print("Could not render %s" % input) def main(): - descri = "ansicat is a terminal rendering tool that will render multiple formats (HTML, \ + descri = ( + "ansicat is a terminal rendering tool that will render multiple formats (HTML, \ Gemtext, RSS, Gophermap, Image) into ANSI text and colors.\n\ When used on a file, ansicat will try to autodetect the format. When used with \ standard input, the format must be manually specified.\n\ If the content contains links, the original URL of the content can be specified \ in order to correctly modify relatives links." - parser = argparse.ArgumentParser(prog="ansicat",description=descri) - parser.add_argument("--format", choices=["auto","gemtext","html","feed","gopher","image","folder","text","plaintext"], - help="Renderer to use. Available: auto, gemtext, html, feed, gopher, image, folder, plaintext") + ) + parser = argparse.ArgumentParser(prog="ansicat", description=descri) + parser.add_argument( + "--format", + choices=[ + "auto", + "gemtext", + "html", + "feed", + "gopher", + "image", + "folder", + "text", + "plaintext", + ], + help="Renderer to use. Available: auto, gemtext, html, feed, gopher, image, folder, plaintext", + ) parser.add_argument("--mime", help="Mime of the content to parse") ## The argument needs to be a path to a file. If none, then stdin is used which allows ## to pipe text directly into ansirenderer - parser.add_argument("--url",metavar="URL", nargs="*", - help="Original URL of the content") - parser.add_argument("--mode", metavar="MODE", - help="Which mode should be used to render: normal (default), full or source.\ - With HTML, the normal mode try to extract the article.") - parser.add_argument("content",metavar="INPUT", nargs="*", type=argparse.FileType("r"), - default=sys.stdin, help="Path to the text to render (default to stdin)") + parser.add_argument( + "--url", metavar="URL", nargs="*", help="Original URL of the content" + ) + parser.add_argument( + "--mode", + metavar="MODE", + help="Which mode should be used to render: normal (default), full or source.\ + With HTML, the normal mode try to extract the article.", + ) + parser.add_argument( + "content", + metavar="INPUT", + nargs="*", + type=argparse.FileType("r"), + default=sys.stdin, + help="Path to the text to render (default to stdin)", + ) args = parser.parse_args() # Detect if we are running interactively or in a pipe if sys.stdin.isatty(): - #we are interactive, not in stdin, we can have multiple files as input - if isinstance(args.content,list): + # we are interactive, not in stdin, we can have multiple files as input + if isinstance(args.content, list): for f in args.content: path = os.path.abspath(f.name) try: content = f.read() except UnicodeDecodeError: content = f - render(content,path=path,format=args.format,url=args.url,mime=args.mime,mode=args.mode) + render( + content, + path=path, + format=args.format, + url=args.url, + mime=args.mime, + mode=args.mode, + ) else: print("Ansicat needs at least one file as an argument") else: - #we are in stdin + # we are in stdin if not args.format and not args.mime: print("Format or mime should be specified when running with stdin") else: - render(args.content.read(),path=None,format=args.format,url=args.url,mime=args.mime,mode=args.mode) + render( + args.content.read(), + path=None, + format=args.format, + url=args.url, + mime=args.mime, + mode=args.mode, + ) + -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/cert_migration.py b/cert_migration.py index a0318b6..27992d9 100644 --- a/cert_migration.py +++ b/cert_migration.py @@ -11,9 +11,10 @@ immediately previous format. """ -import sqlite3 -import os import datetime +import os +import sqlite3 + def upgrade_to_1(data_dir: str, config_dir: str) -> None: print("moving from tofu.db to certificates as files") diff --git a/netcache.py b/netcache.py index 4c953ea..da66e7a 100755 --- a/netcache.py +++ b/netcache.py @@ -1,22 +1,25 @@ #!/usr/bin/env python3 -import os -import sys -import urllib.parse import argparse import codecs +import datetime import getpass -import socket -import ssl import glob -import datetime import hashlib +import os +import socket +import ssl +import sys +import time +import urllib.parse from ssl import CertificateError + import ansicat import offutils from offutils import xdg -import time + try: import chardet + _HAS_CHARDET = True except ModuleNotFoundError: _HAS_CHARDET = False @@ -24,31 +27,32 @@ except ModuleNotFoundError: try: from cryptography import x509 from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa - from cryptography.hazmat.primitives import serialization + _HAS_CRYPTOGRAPHY = True _BACKEND = default_backend() -except(ModuleNotFoundError,ImportError): +except (ModuleNotFoundError, ImportError): _HAS_CRYPTOGRAPHY = False try: import requests + _DO_HTTP = True -except (ModuleNotFoundError,ImportError): +except (ModuleNotFoundError, ImportError): _DO_HTTP = False # This list is also used as a list of supported protocols standard_ports = { - "gemini" : 1965, - "gopher" : 70, - "finger" : 79, - "http" : 80, - "https" : 443, - "spartan": 300, + "gemini": 1965, + "gopher": 70, + "finger": 79, + "http": 80, + "https": 443, + "spartan": 300, } default_protocol = "gemini" -CRLF = '\r\n' +CRLF = "\r\n" DEFAULT_TIMEOUT = 10 _MAX_REDIRECTS = 5 @@ -68,16 +72,17 @@ def parse_mime(mime): options = {} if mime: if ";" in mime: - splited = mime.split(";",maxsplit=1) + splited = mime.split(";", maxsplit=1) mime = splited[0] if len(splited) >= 1: options_list = splited[1].split() for o in options_list: - spl = o.split("=",maxsplit=1) + spl = o.split("=", maxsplit=1) if len(spl) > 0: options[spl[0]] = spl[1] return mime, options + def normalize_url(url): if "://" not in url and ("./" not in url and url[0] != "/"): if not url.startswith("mailto:"): @@ -94,7 +99,8 @@ def cache_last_modified(url): else: return None -def is_cache_valid(url,validity=0): + +def is_cache_valid(url, validity=0): # Validity is the acceptable time for # a cache to be valid (in seconds) # If 0, then any cache is considered as valid @@ -102,14 +108,14 @@ def is_cache_valid(url,validity=0): if offutils.is_local(url): return True cache = get_cache_path(url) - if cache : + if cache: # If path is too long, we always return True to avoid # fetching it. if len(cache) > 259: print("We return False because path is too long") return False if os.path.exists(cache) and not os.path.isdir(cache): - if validity > 0 : + if validity > 0: last_modification = cache_last_modified(url) now = time.time() age = now - last_modification @@ -117,17 +123,18 @@ def is_cache_valid(url,validity=0): else: return True else: - #Cache has not been build + # Cache has not been build return False else: - #There’s not even a cache! + # There’s not even a cache! return False -def get_cache_path(url,add_index=True): + +def get_cache_path(url, add_index=True): # Sometimes, cache_path became a folder! (which happens for index.html/index.gmi) # In that case, we need to reconstruct it # if add_index=False, we don’t add that "index.gmi" at the ends of the cache_path - #First, we parse the URL + # First, we parse the URL if not url: return None parsed = urllib.parse.urlparse(url) @@ -137,7 +144,7 @@ def get_cache_path(url,add_index=True): scheme = parsed.scheme else: scheme = default_protocol - if scheme in ["file","mailto","list"]: + if scheme in ["file", "mailto", "list"]: local = True host = "" port = None @@ -147,24 +154,24 @@ def get_cache_path(url,add_index=True): elif scheme == "mailto": path = parsed.path elif url.startswith("list://"): - listdir = os.path.join(xdg("data"),"lists") + listdir = os.path.join(xdg("data"), "lists") listname = url[7:].lstrip("/") if listname in [""]: name = "My Lists" path = listdir else: name = listname - path = os.path.join(listdir, "%s.gmi"%listname) + path = os.path.join(listdir, "%s.gmi" % listname) else: path = url else: local = False # Convert unicode hostname to punycode using idna RFC3490 - host = parsed.netloc #.encode("idna").decode() + host = parsed.netloc # .encode("idna").decode() try: port = parsed.port or standard_ports.get(scheme, 0) except ValueError: - port = standard_ports.get(scheme,0) + port = standard_ports.get(scheme, 0) # special gopher selector case if scheme == "gopher": if len(parsed.path) >= 2: @@ -179,7 +186,7 @@ def get_cache_path(url,add_index=True): mime = "text/gopher" elif itemtype == "h": mime = "text/html" - elif itemtype in ("9","g","I","s",";"): + elif itemtype in ("9", "g", "I", "s", ";"): mime = "binary" else: mime = "text/gopher" @@ -189,7 +196,7 @@ def get_cache_path(url,add_index=True): # we don’t add the query if path is too long because path above 260 char # are not supported and crash python. # Also, very long query are usually useless stuff - if len(path+parsed.query) < 258: + if len(path + parsed.query) < 258: path += "/" + parsed.query # Now, we have a partial path. Let’s make it full path. @@ -197,8 +204,8 @@ def get_cache_path(url,add_index=True): cache_path = path elif scheme and host: cache_path = os.path.expanduser(xdg("cache") + scheme + "/" + host + path) - #There’s an OS limitation of 260 characters per path. - #We will thus cut the path enough to add the index afterward + # There’s an OS limitation of 260 characters per path. + # We will thus cut the path enough to add the index afterward cache_path = cache_path[:249] # this is a gross hack to give a name to # index files. This will break if the index is not @@ -220,14 +227,14 @@ def get_cache_path(url,add_index=True): url += "/" if add_index and cache_path.endswith("/"): cache_path += index - #sometimes, the index itself is a dir - #like when folder/index.gmi?param has been created - #and we try to access folder + # sometimes, the index itself is a dir + # like when folder/index.gmi?param has been created + # and we try to access folder if add_index and os.path.isdir(cache_path): cache_path += "/" + index else: - #URL is missing either a supported scheme or a valid host - #print("Error: %s is not a supported url"%url) + # URL is missing either a supported scheme or a valid host + # print("Error: %s is not a supported url"%url) return None if len(cache_path) > 259: print("Path is too long. This is an OS limitation.\n\n") @@ -235,10 +242,11 @@ def get_cache_path(url,add_index=True): return None return cache_path -def write_body(url,body,mime=None): - ## body is a copy of the raw gemtext - ## Write_body() also create the cache ! - # DEFAULT GEMINI MIME + +def write_body(url, body, mime=None): + # body is a copy of the raw gemtext + # Write_body() also create the cache ! + # DEFAULT GEMINI MIME mime, options = parse_mime(mime) cache_path = get_cache_path(url) if cache_path: @@ -259,17 +267,17 @@ def write_body(url,body,mime=None): root_dir = os.path.dirname(root_dir) if os.path.isfile(root_dir): os.remove(root_dir) - os.makedirs(cache_dir,exist_ok=True) + os.makedirs(cache_dir, exist_ok=True) with open(cache_path, mode=mode) as f: f.write(body) f.close() return cache_path -def set_error(url,err): -# If we get an error, we want to keep an existing cache -# but we need to touch it or to create an empty one -# to avoid hitting the error at each refresh +def set_error(url, err): + # If we get an error, we want to keep an existing cache + # but we need to touch it or to create an empty one + # to avoid hitting the error at each refresh cache = get_cache_path(url) if is_cache_valid(url): os.utime(cache) @@ -280,76 +288,94 @@ def set_error(url,err): root_dir = os.path.dirname(root_dir) if os.path.isfile(root_dir): os.remove(root_dir) - os.makedirs(cache_dir,exist_ok=True) + os.makedirs(cache_dir, exist_ok=True) if os.path.isdir(cache_dir): with open(cache, "w") as c: - c.write(str(datetime.datetime.now())+"\n") - c.write("ERROR while caching %s\n\n" %url) + c.write(str(datetime.datetime.now()) + "\n") + c.write("ERROR while caching %s\n\n" % url) c.write("*****\n\n") c.write(str(type(err)) + " = " + str(err)) - #cache.write("\n" + str(err.with_traceback(None))) + # cache.write("\n" + str(err.with_traceback(None))) c.write("\n*****\n\n") - c.write("If you believe this error was temporary, type ""reload"".\n") + c.write("If you believe this error was temporary, type " "reload" ".\n") c.write("The resource will be tentatively fetched during next sync.\n") c.close() return cache -def _fetch_http(url,max_size=None,timeout=DEFAULT_TIMEOUT,accept_bad_ssl_certificates=False,**kwargs): - if not _DO_HTTP: return None - def too_large_error(url,length,max_size): - err = "Size of %s is %s Mo\n"%(url,length) - err += "Offpunk only download automatically content under %s Mo\n" %(max_size/1000000) + +def _fetch_http( + url, + max_size=None, + timeout=DEFAULT_TIMEOUT, + accept_bad_ssl_certificates=False, + **kwargs, +): + if not _DO_HTTP: + return None + + def too_large_error(url, length, max_size): + err = "Size of %s is %s Mo\n" % (url, length) + err += "Offpunk only download automatically content under %s Mo\n" % ( + max_size / 1000000 + ) err += "To retrieve this content anyway, type 'reload'." - return set_error(url,err) + return set_error(url, err) + if accept_bad_ssl_certificates: - requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=1' + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "ALL:@SECLEVEL=1" requests.packages.urllib3.disable_warnings() - verify=False + verify = False else: - requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = 'ALL:@SECLEVEL=2' - verify=True + requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS = "ALL:@SECLEVEL=2" + verify = True header = {} header["User-Agent"] = "Netcache" - with requests.get(url,verify=verify,headers=header, stream=True,timeout=DEFAULT_TIMEOUT) as response: + with requests.get( + url, verify=verify, headers=header, stream=True, timeout=DEFAULT_TIMEOUT + ) as response: if "content-type" in response.headers: - mime = response.headers['content-type'] + mime = response.headers["content-type"] else: mime = None if "content-length" in response.headers: - length = int(response.headers['content-length']) + length = int(response.headers["content-length"]) else: length = 0 if max_size and length > max_size: response.close() - return too_large_error(url,str(length/100),max_size) + return too_large_error(url, str(length / 100), max_size) elif max_size and length == 0: - body = b'' + body = b"" downloaded = 0 for r in response.iter_content(): body += r - #We divide max_size for streamed content - #in order to catch them faster + # We divide max_size for streamed content + # in order to catch them faster size = sys.getsizeof(body) - max = max_size/2 - current = round(size*100/max,1) + max = max_size / 2 + current = round(size * 100 / max, 1) if current > downloaded: downloaded = current - print(" -> Receiving stream: %s%% of allowed data"%downloaded,end='\r') - #print("size: %s (%s\% of maxlenght)"%(size,size/max_size)) - if size > max_size/2: + print( + " -> Receiving stream: %s%% of allowed data" % downloaded, + end="\r", + ) + # print("size: %s (%s\% of maxlenght)"%(size,size/max_size)) + if size > max_size / 2: response.close() - return too_large_error(url,"streaming",max_size) + return too_large_error(url, "streaming", max_size) response.close() else: body = response.content response.close() if mime and "text/" in mime: - body = body.decode("UTF-8","replace") - cache = write_body(url,body,mime) + body = body.decode("UTF-8", "replace") + cache = write_body(url, body, mime) return cache -def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs): - parsed =urllib.parse.urlparse(url) + +def _fetch_gopher(url, timeout=DEFAULT_TIMEOUT, **kwargs): + parsed = urllib.parse.urlparse(url) host = parsed.hostname port = parsed.port or 70 if len(parsed.path) >= 2: @@ -358,8 +384,8 @@ def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs): else: itemtype = "1" selector = "" - addresses = socket.getaddrinfo(host, port, family=0,type=socket.SOCK_STREAM) - s = socket.create_connection((host,port)) + addresses = socket.getaddrinfo(host, port, family=0, type=socket.SOCK_STREAM) + s = socket.create_connection((host, port)) for address in addresses: s = socket.socket(address[0], address[1]) s.settimeout(timeout) @@ -377,8 +403,8 @@ def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs): response1 = s.makefile("rb") response = response1.read() # Transcode response into UTF-8 - #if itemtype in ("0","1","h"): - if not itemtype in ("9","g","I","s",";"): + # if itemtype in ("0","1","h"): + if itemtype not in ("9", "g", "I", "s", ";"): # Try most common encodings for encoding in ("UTF-8", "ISO-8859-1"): try: @@ -399,28 +425,30 @@ def _fetch_gopher(url,timeout=DEFAULT_TIMEOUT,**kwargs): mime = "text/gopher" elif itemtype == "h": mime = "text/html" - elif itemtype in ("9","g","I","s",";"): + elif itemtype in ("9", "g", "I", "s", ";"): mime = None else: # by default, we should consider Gopher mime = "text/gopher" - cache = write_body(url,response,mime) + cache = write_body(url, response, mime) return cache -def _fetch_finger(url,timeout=DEFAULT_TIMEOUT,**kwargs): + +def _fetch_finger(url, timeout=DEFAULT_TIMEOUT, **kwargs): parsed = urllib.parse.urlparse(url) host = parsed.hostname port = parsed.port or standard_ports["finger"] query = parsed.path.lstrip("/") + "\r\n" - with socket.create_connection((host,port)) as sock: + with socket.create_connection((host, port)) as sock: sock.settimeout(timeout) sock.send(query.encode()) response = sock.makefile("rb").read().decode("UTF-8") - cache = write_body(response,"text/plain") + cache = write_body(response, "text/plain") return cache + # Originally copied from reference spartan client by Michael Lazar -def _fetch_spartan(url,**kwargs): +def _fetch_spartan(url, **kwargs): cache = None url_parts = urllib.parse.urlparse(url) host = url_parts.hostname @@ -428,7 +456,7 @@ def _fetch_spartan(url,**kwargs): path = url_parts.path or "/" query = url_parts.query redirect_url = None - with socket.create_connection((host,port)) as sock: + with socket.create_connection((host, port)) as sock: if query: data = urllib.parse.unquote_to_bytes(query) else: @@ -436,25 +464,26 @@ def _fetch_spartan(url,**kwargs): encoded_host = host.encode("idna") ascii_path = urllib.parse.unquote_to_bytes(path) encoded_path = urllib.parse.quote_from_bytes(ascii_path).encode("ascii") - sock.send(b"%s %s %d\r\n" % (encoded_host,encoded_path,len(data))) + sock.send(b"%s %s %d\r\n" % (encoded_host, encoded_path, len(data))) fp = sock.makefile("rb") response = fp.readline(4096).decode("ascii").strip("\r\n") - parts = response.split(" ",maxsplit=1) - code,meta = int(parts[0]),parts[1] + parts = response.split(" ", maxsplit=1) + code, meta = int(parts[0]), parts[1] if code == 2: body = fp.read() if meta.startswith("text"): body = body.decode("UTF-8") - cache = write_body(url,body,meta) + cache = write_body(url, body, meta) elif code == 3: redirect_url = url_parts._replace(path=meta).geturl() else: - return set_error(url,"Spartan code %s: Error %s"%(code,meta)) + return set_error(url, "Spartan code %s: Error %s" % (code, meta)) if redirect_url: cache = _fetch_spartan(redirect_url) return cache -def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=None): + +def _validate_cert(address, host, cert, accept_bad_ssl=False, automatic_choice=None): """ Validate a TLS certificate in TOFU mode. @@ -477,9 +506,13 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non # Check certificate validity dates if accept_bad_ssl: if c.not_valid_before >= now: - raise CertificateError("Certificate not valid until: {}!".format(c.not_valid_before)) + raise CertificateError( + "Certificate not valid until: {}!".format(c.not_valid_before) + ) elif c.not_valid_after <= now: - raise CertificateError("Certificate expired as of: {})!".format(c.not_valid_after)) + raise CertificateError( + "Certificate expired as of: {})!".format(c.not_valid_after) + ) # Check certificate hostnames names = [] @@ -487,7 +520,14 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non if common_name: names.append(common_name[0].value) try: - names.extend([alt.value for alt in c.extensions.get_extension_for_oid(x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value]) + names.extend( + [ + alt.value + for alt in c.extensions.get_extension_for_oid( + x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME + ).value + ] + ) except x509.ExtensionNotFound: pass names = set(names) @@ -499,7 +539,9 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non continue else: # If we didn't break out, none of the names were valid - raise CertificateError("Hostname does not match certificate common name or any alternative names.") + raise CertificateError( + "Hostname does not match certificate common name or any alternative names." + ) sha = hashlib.sha256() sha.update(cert) @@ -510,12 +552,12 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non certdir = os.path.join(xdg("data"), "certs") hostdir = os.path.join(certdir, host) sitedir = os.path.join(hostdir, address) - #1. We check through cached certificates do extract the - #most_frequent_cert and to see if one is matching the current one. - #2. If we have no match but one valid most_frequent_cert, we do the - #"throws warning" code. - #3. If no certificate directory or no valid cached certificates, we do - #the "First-Use" routine. + # 1. We check through cached certificates do extract the + # most_frequent_cert and to see if one is matching the current one. + # 2. If we have no match but one valid most_frequent_cert, we do the + # "throws warning" code. + # 3. If no certificate directory or no valid cached certificates, we do + # the "First-Use" routine. most_frequent_cert = None matching_fingerprint = False # 1. Have we been here before? (the directory exists) @@ -527,8 +569,8 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non for cached_fingerprint in files: filepath = os.path.join(sitedir, cached_fingerprint) - certpath = os.path.join(certcache,cached_fingerprint+".crt") - with open(filepath, 'r') as f: + certpath = os.path.join(certcache, cached_fingerprint + ".crt") + with open(filepath, "r") as f: count = int(f.read()) if os.path.exists(certpath): if count > max_count: @@ -538,13 +580,13 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non # Matched! # Increase the counter for this certificate (this also updates # the modification time of the file) - with open(filepath, 'w') as f: - f.write(str(count+1)) + with open(filepath, "w") as f: + f.write(str(count + 1)) matching_fingerprint = True break - #2. Do we have some certificates but none of them is matching the current one? + # 2. Do we have some certificates but none of them is matching the current one? if most_frequent_cert and not matching_fingerprint: - with open(os.path.join(certcache, most_frequent_cert+".crt"), "rb") as fp: + with open(os.path.join(certcache, most_frequent_cert + ".crt"), "rb") as fp: previous_cert = fp.read() if _HAS_CRYPTOGRAPHY: # Load the most frequently seen certificate to see if it has @@ -555,9 +597,17 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non print("****************************************") print("[SECURITY WARNING] Unrecognised certificate!") - print("The certificate presented for {} ({}) has never been seen before.".format(host, address)) + print( + "The certificate presented for {} ({}) has never been seen before.".format( + host, address + ) + ) print("This MIGHT be a Man-in-the-Middle attack.") - print("A different certificate has previously been seen {} times.".format(max_count)) + print( + "A different certificate has previously been seen {} times.".format( + max_count + ) + ) if _HAS_CRYPTOGRAPHY: if previous_ttl < datetime.timedelta(): print("That certificate has expired, which reduces suspicion somewhat.") @@ -573,18 +623,20 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non if choice in ("y", "yes"): with open(os.path.join(sitedir, fingerprint), "w") as fp: fp.write("1") - with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp: + with open(os.path.join(certcache, fingerprint + ".crt"), "wb") as fp: fp.write(cert) else: raise Exception("TOFU Failure!") - #3. If no directory or no cert found in it, we cache it + # 3. If no directory or no cert found in it, we cache it if not most_frequent_cert: - if not os.path.exists(certdir): # XDG_DATA/offpunk/certs + if not os.path.exists(certdir): # XDG_DATA/offpunk/certs os.makedirs(certdir) - if not os.path.exists(hostdir): # XDG_DATA/offpunk/certs/site.net + if not os.path.exists(hostdir): # XDG_DATA/offpunk/certs/site.net os.makedirs(hostdir) - if not os.path.exists(sitedir): # XDG_DATA/offpunk/certs/site.net/123.123.123.123 + if not os.path.exists( + sitedir + ): # XDG_DATA/offpunk/certs/site.net/123.123.123.123 os.makedirs(sitedir) with open(os.path.join(sitedir, fingerprint), "w") as fp: @@ -592,9 +644,10 @@ def _validate_cert(address, host, cert,accept_bad_ssl=False,automatic_choice=Non certcache = os.path.join(xdg("config"), "cert_cache") if not os.path.exists(certcache): os.makedirs(certcache) - with open(os.path.join(certcache, fingerprint+".crt"), "wb") as fp: + with open(os.path.join(certcache, fingerprint + ".crt"), "wb") as fp: fp.write(cert) + def _get_client_certkey(site_id: str, host: str): # returns {cert: str, key: str} certdir = os.path.join(xdg("data"), "certs", host) @@ -603,18 +656,19 @@ def _get_client_certkey(site_id: str, host: str): if not os.path.exists(certf) or not os.path.exists(keyf): if host != "": split = host.split(".") - #if len(split) > 2: # Why not allow a global identity? Maybe I want - # to login to all sites with the same - # certificate. + # if len(split) > 2: # Why not allow a global identity? Maybe I want + # to login to all sites with the same + # certificate. return _get_client_certkey(site_id, ".".join(split[1:])) return None certkey = dict(cert=certf, key=keyf) return certkey + def _get_site_ids(url: str): newurl = normalize_url(url) u = urllib.parse.urlparse(newurl) - if u.scheme == "gemini" and u.username == None: + if u.scheme == "gemini" and u.username is None: certdir = os.path.join(xdg("data"), "certs") netloc_parts = u.netloc.split(".") site_ids = [] @@ -624,67 +678,76 @@ def _get_site_ids(url: str): direc = os.path.join(certdir, lasti) for certfile in glob.glob(os.path.join(direc, "*.cert")): - site_id = certfile.split('/')[-1].split(".")[-2] + site_id = certfile.split("/")[-1].split(".")[-2] site_ids.append(site_id) return site_ids else: return [] + def create_certificate(name: str, days: int, hostname: str): - key = rsa.generate_private_key( - public_exponent = 65537, - key_size = 2048) + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) sitecertdir = os.path.join(xdg("data"), "certs", hostname) - keyfile = os.path.join(sitecertdir, name+".key") + keyfile = os.path.join(sitecertdir, name + ".key") # create the directory of it doesn't exist os.makedirs(sitecertdir, exist_ok=True) with open(keyfile, "wb") as f: - f.write(key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption() - )) - xname = x509.Name([ - x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name), - ]) + f.write( + key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + xname = x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name), + ] + ) # generate the cert, valid a week ago (timekeeping is hard, let's give it a # little margin). issuer and subject are your name - cert = (x509.CertificateBuilder() - .subject_name(xname) - .issuer_name(xname) - .public_key(key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.datetime.utcnow() - - datetime.timedelta(days=7)) - .not_valid_after(datetime.datetime.utcnow() + - datetime.timedelta(days=days)) - .sign(key, hashes.SHA256()) - ) + cert = ( + x509.CertificateBuilder() + .subject_name(xname) + .issuer_name(xname) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=7)) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=days)) + .sign(key, hashes.SHA256()) + ) certfile = os.path.join(sitecertdir, name + ".cert") with open(certfile, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) + def get_certs(url: str): u = urllib.parse.urlparse(normalize_url(url)) if u.scheme == "gemini": certdir = os.path.join(xdg("data"), "certs") netloc_parts = u.netloc.split(".") site_ids = [] - if '@' in netloc_parts[0]: - netloc_parts[0] = netloc_parts[0].split('@')[1] + if "@" in netloc_parts[0]: + netloc_parts[0] = netloc_parts[0].split("@")[1] for i in range(len(netloc_parts), 0, -1): lasti = ".".join(netloc_parts[-i:]) direc = os.path.join(certdir, lasti) for certfile in glob.glob(os.path.join(direc, "*.cert")): - site_id = certfile.split('/')[-1].split(".")[-2] + site_id = certfile.split("/")[-1].split(".")[-2] site_ids.append(site_id) return site_ids else: return [] -def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_certificates=False,\ - **kwargs): + +def _fetch_gemini( + url, + timeout=DEFAULT_TIMEOUT, + interactive=True, + accept_bad_ssl_certificates=False, + **kwargs, +): cache = None newurl = url url_parts = urllib.parse.urlparse(url) @@ -694,8 +757,8 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce path = url_parts.path or "/" query = url_parts.query # In AV-98, this was the _send_request method - #Send a selector to a given host and port. - #Returns the resolved address and binary file with the reply.""" + # Send a selector to a given host and port. + # Returns the resolved address and binary file with the reply.""" host = host.encode("idna").decode() # Do DNS resolution # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled @@ -709,13 +772,16 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce else: # IPv4 only family_mask = socket.AF_INET - addresses = socket.getaddrinfo(host, port, family=family_mask, - type=socket.SOCK_STREAM) + addresses = socket.getaddrinfo( + host, port, family=family_mask, type=socket.SOCK_STREAM + ) # Sort addresses so IPv6 ones come first addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) - ## Continuation of send_request + # Continuation of send_request # Prepare TLS context - protocol = ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >=6 else ssl.PROTOCOL_TLSv1_2 + protocol = ( + ssl.PROTOCOL_TLS_CLIENT if sys.version_info.minor >= 6 else ssl.PROTOCOL_TLSv1_2 + ) context = ssl.SSLContext(protocol) context.check_hostname = False context.verify_mode = ssl.CERT_NONE @@ -728,19 +794,21 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce else: print("This identity doesn't exist for this site (or is disabled).") # Impose minimum TLS version - ## In 3.7 and above, this is easy... + # In 3.7 and above, this is easy... if sys.version_info.minor >= 7: context.minimum_version = ssl.TLSVersion.TLSv1_2 - ## Otherwise, it seems very hard... - ## The below is less strict than it ought to be, but trying to disable - ## TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures - ## with recent versions of OpenSSL. What a mess... + # Otherwise, it seems very hard... + # The below is less strict than it ought to be, but trying to disable + # TLS v1.1 here using ssl.OP_NO_TLSv1_1 produces unexpected failures + # with recent versions of OpenSSL. What a mess... else: context.options |= ssl.OP_NO_SSLv3 context.options |= ssl.OP_NO_SSLv2 # Try to enforce sensible ciphers try: - context.set_ciphers("AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH") + context.set_ciphers( + "AESGCM+ECDHE:AESGCM+DHE:CHACHA20+ECDHE:CHACHA20+DHE:!DSS:!SHA1:!MD5:@STRENGTH" + ) except ssl.SSLError: # Rely on the server to only support sensible things, I guess... pass @@ -750,7 +818,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce try: s = socket.socket(address[0], address[1]) s.settimeout(timeout) - s = context.wrap_socket(s, server_hostname = host) + s = context.wrap_socket(s, server_hostname=host) s.connect(address[4]) break except OSError as e: @@ -764,12 +832,12 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce # Do TOFU cert = s.getpeercert(binary_form=True) # Remember that we showed the current cert to this domain... - #TODO : accept badssl and automatic choice - _validate_cert(address[4][0], host, cert,automatic_choice="y") + # TODO : accept badssl and automatic choice + _validate_cert(address[4][0], host, cert, automatic_choice="y") # Send request and wrap response in a file descriptor url = urllib.parse.urlparse(url) new_host = host - #Handle IPV6 hostname + # Handle IPV6 hostname if ":" in new_host: new_host = "[" + new_host + "]" if port != standard_ports["gemini"]: @@ -777,18 +845,18 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce url_no_username = urllib.parse.urlunparse(url._replace(netloc=new_host)) if site_id: - url = urllib.parse.urlunparse(url._replace(netloc=site_id+"@"+new_host)) + url = urllib.parse.urlunparse(url._replace(netloc=site_id + "@" + new_host)) else: url = url_no_username s.sendall((url_no_username + CRLF).encode("UTF-8")) - f = s.makefile(mode = "rb") + f = s.makefile(mode="rb") ## end of send_request in AV98 # Spec dictates <META> should not exceed 1024 bytes, # so maximum valid header length is 1027 bytes. header = f.readline(1027) header = urllib.parse.unquote(header.decode("UTF-8")) - if not header or header[-1] != '\n': + if not header or header[-1] != "\n": raise RuntimeError("Received invalid header from server!") header = header.strip() # Validate header @@ -799,9 +867,9 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce # Update redirect loop/maze escaping state if not status.startswith("3"): previous_redirectors = set() - #TODO FIXME + # TODO FIXME else: - #we set a previous_redirectors anyway because refactoring in progress + # we set a previous_redirectors anyway because refactoring in progress previous_redirectors = set() # Handle non-SUCCESS headers, which don't have a response body # Inputs @@ -813,40 +881,43 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce else: user_input = input("> ") newurl = url.split("?")[0] - return _fetch_gemini(newurl+"?"+user_input) + return _fetch_gemini(newurl + "?" + user_input) else: - return None,None + return None, None # Redirects elif status.startswith("3"): - newurl = urllib.parse.urljoin(url,meta) + newurl = urllib.parse.urljoin(url, meta) if newurl == url: raise RuntimeError("URL redirects to itself!") elif newurl in previous_redirectors: raise RuntimeError("Caught in redirect loop!") elif len(previous_redirectors) == _MAX_REDIRECTS: - raise RuntimeError("Refusing to follow more than %d consecutive redirects!" % _MAX_REDIRECTS) -# TODO: redirections handling should be refactored -# elif "interactive" in options and not options["interactive"]: -# follow = self.automatic_choice -# # Never follow cross-domain redirects without asking -# elif new_gi.host.encode("idna") != gi.host.encode("idna"): -# follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) -# # Never follow cross-protocol redirects without asking -# elif new_gi.scheme != gi.scheme: -# follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) -# # Don't follow *any* redirect without asking if auto-follow is off -# elif not self.options["auto_follow_redirects"]: -# follow = input("Follow redirect to %s? (y/n) " % new_gi.url) -# # Otherwise, follow away + raise RuntimeError( + "Refusing to follow more than %d consecutive redirects!" + % _MAX_REDIRECTS + ) + # TODO: redirections handling should be refactored + # elif "interactive" in options and not options["interactive"]: + # follow = self.automatic_choice + # # Never follow cross-domain redirects without asking + # elif new_gi.host.encode("idna") != gi.host.encode("idna"): + # follow = input("Follow cross-domain redirect to %s? (y/n) " % new_gi.url) + # # Never follow cross-protocol redirects without asking + # elif new_gi.scheme != gi.scheme: + # follow = input("Follow cross-protocol redirect to %s? (y/n) " % new_gi.url) + # # Don't follow *any* redirect without asking if auto-follow is off + # elif not self.options["auto_follow_redirects"]: + # follow = input("Follow redirect to %s? (y/n) " % new_gi.url) + # # Otherwise, follow away else: follow = "yes" if follow.strip().lower() not in ("y", "yes"): raise UserAbortException() previous_redirectors.add(url) -# if status == "31": -# # Permanent redirect -# self.permanent_redirects[gi.url] = new_gi.url - return _fetch_gemini(newurl,interactive=interactive) + # if status == "31": + # # Permanent redirect + # self.permanent_redirects[gi.url] = new_gi.url + return _fetch_gemini(newurl, interactive=interactive) # Errors elif status.startswith("4") or status.startswith("5"): raise RuntimeError(meta) @@ -862,7 +933,7 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce mime = meta # Read the response body over the network fbody = f.read() - # DEFAULT GEMINI MIME + # DEFAULT GEMINI MIME if mime == "": mime = "text/gemini; charset=utf-8" shortmime, mime_options = parse_mime(mime) @@ -870,59 +941,73 @@ def _fetch_gemini(url,timeout=DEFAULT_TIMEOUT,interactive=True,accept_bad_ssl_ce try: codecs.lookup(mime_options["charset"]) except LookupError: - #raise RuntimeError("Header declared unknown encoding %s" % mime_options) - #If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header + # raise RuntimeError("Header declared unknown encoding %s" % mime_options) + # If the encoding is wrong, there’s a high probably it’s UTF-8 with a bad header mime_options["charset"] = "UTF-8" if shortmime.startswith("text/"): - #Get the charset and default to UTF-8 in none + # Get the charset and default to UTF-8 in none encoding = mime_options.get("charset", "UTF-8") try: body = fbody.decode(encoding) except UnicodeError: - raise RuntimeError("Could not decode response body using %s\ - encoding declared in header!" % encoding) + raise RuntimeError( + "Could not decode response body using %s\ + encoding declared in header!" + % encoding + ) else: body = fbody - cache = write_body(url,body,mime) - return cache,url + cache = write_body(url, body, mime) + return cache, url -def fetch(url,offline=False,download_image_first=True,images_mode="readable",validity=0,**kwargs): +def fetch( + url, + offline=False, + download_image_first=True, + images_mode="readable", + validity=0, + **kwargs, +): url = normalize_url(url) newurl = url - path=None + path = None print_error = "print_error" in kwargs.keys() and kwargs["print_error"] - #First, we look if we have a valid cache, even if offline - #If we are offline, any cache is better than nothing - if is_cache_valid(url,validity=validity) or (offline and is_cache_valid(url,validity=0)): + # First, we look if we have a valid cache, even if offline + # If we are offline, any cache is better than nothing + if is_cache_valid(url, validity=validity) or ( + offline and is_cache_valid(url, validity=0) + ): path = get_cache_path(url) - #if the cache is a folder, we should add a "/" at the end of the URL - if not url.endswith("/") and os.path.isdir(get_cache_path(url,add_index=False)) : - newurl = url+"/" - elif offline and is_cache_valid(url,validity=0): + # if the cache is a folder, we should add a "/" at the end of the URL + if not url.endswith("/") and os.path.isdir( + get_cache_path(url, add_index=False) + ): + newurl = url + "/" + elif offline and is_cache_valid(url, validity=0): path = get_cache_path(url) elif "://" in url and not offline: try: scheme = url.split("://")[0] if scheme not in standard_ports: if print_error: - print("%s is not a supported protocol"%scheme) + print("%s is not a supported protocol" % scheme) path = None - elif scheme in ("http","https"): + elif scheme in ("http", "https"): if _DO_HTTP: - path=_fetch_http(newurl,**kwargs) + path = _fetch_http(newurl, **kwargs) else: print("HTTP requires python-requests") elif scheme == "gopher": - path=_fetch_gopher(newurl,**kwargs) + path = _fetch_gopher(newurl, **kwargs) elif scheme == "finger": - path=_fetch_finger(newurl,**kwargs) + path = _fetch_finger(newurl, **kwargs) elif scheme == "gemini": - path,newurl=_fetch_gemini(url,**kwargs) + path, newurl = _fetch_gemini(url, **kwargs) elif scheme == "spartan": - path,newurl=_fetch_spartan(url,**kwargs) + path, newurl = _fetch_spartan(url, **kwargs) else: - print("scheme %s not implemented yet"%scheme) + print("scheme %s not implemented yet" % scheme) except UserAbortException: return None, newurl except Exception as err: @@ -947,67 +1032,102 @@ def fetch(url,offline=False,download_image_first=True,images_mode="readable",val print("""ERROR5: Trying to create a directory which already exists in the cache : """) print(err) - elif _DO_HTTP and isinstance(err,requests.exceptions.SSLError): + elif _DO_HTTP and isinstance(err, requests.exceptions.SSLError): if print_error: print("""ERROR6: Bad SSL certificate:\n""") print(err) - print("""\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""") + print( + """\n If you know what you are doing, you can try to accept bad certificates with the following command:\n""" + ) print("""set accept_bad_ssl_certificates True""") - elif _DO_HTTP and isinstance(err,requests.exceptions.ConnectionError): + elif _DO_HTTP and isinstance(err, requests.exceptions.ConnectionError): if print_error: print("""ERROR7: Cannot connect to URL:\n""") print(str(err)) else: if print_error: import traceback + print("ERROR4: " + str(type(err)) + " : " + str(err)) - #print("\n" + str(err.with_traceback(None))) + # print("\n" + str(err.with_traceback(None))) print(traceback.format_exc()) return cache, newurl # We download images contained in the document (from full mode) if not offline and download_image_first and images_mode: - renderer = ansicat.renderer_from_file(path,newurl) + renderer = ansicat.renderer_from_file(path, newurl) if renderer: for image in renderer.get_images(mode=images_mode): - #Image should exist, should be an url (not a data image) - #and should not be already cached - if image and not image.startswith("data:image/") and not is_cache_valid(image): + # Image should exist, should be an url (not a data image) + # and should not be already cached + if ( + image + and not image.startswith("data:image/") + and not is_cache_valid(image) + ): width = offutils.term_width() - 1 - toprint = "Downloading %s" %image + toprint = "Downloading %s" % image toprint = toprint[:width] - toprint += " "*(width-len(toprint)) - print(toprint,end="\r") - #d_i_f and images_mode are False/None to avoid recursive downloading - #if that ever happen - fetch(image,offline=offline,download_image_first=False,\ - images_mode=None,validity=0,**kwargs) + toprint += " " * (width - len(toprint)) + print(toprint, end="\r") + # d_i_f and images_mode are False/None to avoid recursive downloading + # if that ever happen + fetch( + image, + offline=offline, + download_image_first=False, + images_mode=None, + validity=0, + **kwargs, + ) return path, newurl def main(): - - descri="Netcache is a command-line tool to retrieve, cache and access networked content.\n\ + descri = "Netcache is a command-line tool to retrieve, cache and access networked content.\n\ By default, netcache will returns a cached version of a given URL, downloading it \ only if a cache version doesn't exist. A validity duration, in seconds, can also \ be given so netcache downloads the content only if the existing cache is older than the validity." # Parse arguments - parser = argparse.ArgumentParser(prog="netcache",description=descri) - parser.add_argument("--path", action="store_true", - help="return path to the cache instead of the content of the cache") - parser.add_argument("--ids", action="store_true", - help="return a list of id's for the gemini-site instead of the content of the cache") - parser.add_argument("--offline", action="store_true", - help="Do not attempt to download, return cached version or error") - parser.add_argument("--max-size", type=int, - help="Cancel download of items above that size (value in Mb).") - parser.add_argument("--timeout", type=int, - help="Time to wait before cancelling connection (in second).") - parser.add_argument("--cache-validity",type=int, default=0, - help="maximum age, in second, of the cached version before \ - redownloading a new version") + parser = argparse.ArgumentParser(prog="netcache", description=descri) + parser.add_argument( + "--path", + action="store_true", + help="return path to the cache instead of the content of the cache", + ) + parser.add_argument( + "--ids", + action="store_true", + help="return a list of id's for the gemini-site instead of the content of the cache", + ) + parser.add_argument( + "--offline", + action="store_true", + help="Do not attempt to download, return cached version or error", + ) + parser.add_argument( + "--max-size", + type=int, + help="Cancel download of items above that size (value in Mb).", + ) + parser.add_argument( + "--timeout", + type=int, + help="Time to wait before cancelling connection (in second).", + ) + parser.add_argument( + "--cache-validity", + type=int, + default=0, + help="maximum age, in second, of the cached version before \ + redownloading a new version", + ) # No argument: write help - parser.add_argument('url', metavar='URL', nargs='*', - help='download URL and returns the content or the path to a cached version') + parser.add_argument( + "url", + metavar="URL", + nargs="*", + help="download URL and returns the content or the path to a cached version", + ) # --validity : returns the date of the cached version, Null if no version # --force-download : download and replace cache, even if valid args = parser.parse_args() @@ -1019,17 +1139,21 @@ def main(): elif args.ids: ids = _get_site_ids(u) else: - path,url = fetch(u,max_size=args.max_size,timeout=args.timeout,\ - validity=args.cache_validity) + path, url = fetch( + u, + max_size=args.max_size, + timeout=args.timeout, + validity=args.cache_validity, + ) if args.path: print(path) elif args.ids: print(ids) else: - with open(path,"r") as f: + with open(path, "r") as f: print(f.read()) f.close() - -if __name__== '__main__': + +if __name__ == "__main__": main() diff --git a/netcache_migration.py b/netcache_migration.py index ce76716..e18db83 100644 --- a/netcache_migration.py +++ b/netcache_migration.py @@ -13,7 +13,6 @@ from the immediately previous cache format. All migration functions must be called at the end of this script from oldest to newest. """ -import argparse import os import os.path @@ -23,9 +22,9 @@ def upgrade_to_1(cache_dir: str) -> None: Rename index.txt to gophermap in the Gopher protocol cache. """ print("Upgrading cache to version 1: migrating index.txt to gophermap") - for root, _, files in os.walk(os.path.join(cache_dir, 'gopher')): + for root, _, files in os.walk(os.path.join(cache_dir, "gopher")): for f in files: - if f == 'index.txt': + if f == "index.txt": src = os.path.join(root, f) - dst = os.path.join(root, 'gophermap') + dst = os.path.join(root, "gophermap") os.rename(src, dst) diff --git a/offpunk.py b/offpunk.py index 54dbdf1..c91c72b 100755 --- a/offpunk.py +++ b/offpunk.py @@ -15,13 +15,21 @@ import shutil import sys import time import urllib.parse -import netcache -import opnk + import ansicat -import offthemes -from offutils import run, term_width, is_local, mode_url, unmode_url, looks_like_url -from offutils import xdg +import netcache import offblocklist +import offthemes +import opnk +from offutils import ( + is_local, + looks_like_url, + mode_url, + run, + term_width, + unmode_url, + xdg, +) try: import setproctitle diff --git a/offutils.py b/offutils.py index 120ee63..3d67e4a 100644 --- a/offutils.py +++ b/offutils.py @@ -1,38 +1,45 @@ #!/bin/python -#This file contains some utilities common to offpunk, ansicat and netcache. -#Currently, there are the following utilities: +# This file contains some utilities common to offpunk, ansicat and netcache. +# Currently, there are the following utilities: # # run : run a shell command and get the results with some security # term_width : get or set the width to display on the terminal -import os import io -import subprocess -import shutil +import os import shlex +import shutil +import subprocess import urllib.parse -import urllib.parse -import netcache_migration -import netcache + import cert_migration +import netcache +import netcache_migration CACHE_VERSION = 1 CERT_VERSION = 1 -#let’s find if grep supports --color=auto +# let’s find if grep supports --color=auto try: - test=subprocess.run(["grep","--color=auto","x"],input=b"x",check=True,\ - stdout=subprocess.PIPE,stderr=subprocess.STDOUT) + test = subprocess.run( + ["grep", "--color=auto", "x"], + input=b"x", + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) GREPCMD = "grep --color=auto" -except Exception as err: +except Exception: GREPCMD = "grep" # We upgrade the cache only once at startup, hence the CACHE_UPGRADED variable # This is only to avoid unnecessary checks each time the cache is accessed -CACHE_UPGRADED=False +CACHE_UPGRADED = False + + def upgrade_cache(cache_folder): - #Let’s read current version of the cache + # Let’s read current version of the cache version_path = cache_folder + ".version" current_version = 0 if os.path.exists(version_path): @@ -42,25 +49,27 @@ def upgrade_cache(cache_folder): f.close() try: current_version = int(current_str) - except: + except Exception: current_version = 0 - #Now, let’s upgrade the cache if needed + # Now, let’s upgrade the cache if needed while current_version < CACHE_VERSION: current_version += 1 - upgrade_func = getattr(netcache_migration,"upgrade_to_"+str(current_version)) + upgrade_func = getattr(netcache_migration, "upgrade_to_" + str(current_version)) upgrade_func(cache_folder) - with open(version_path,"w") as f: + with open(version_path, "w") as f: f.write(str(current_version)) f.close() - CACHE_UPGRADED=True + CACHE_UPGRADED = True + + +CERT_UPGRADED = False -CERT_UPGRADED=False def upgrade_cert(config_folder: str, data_folder: str) -> None: # read the current version - certdata = os.path.join(data_folder, 'certs') + certdata = os.path.join(data_folder, "certs") if not os.path.exists(certdata): - os.makedirs(certdata,exist_ok=True) + os.makedirs(certdata, exist_ok=True) version_path = os.path.join(certdata, ".version") current_version = 0 if os.path.exists(version_path): @@ -70,51 +79,49 @@ def upgrade_cert(config_folder: str, data_folder: str) -> None: f.close() try: current_version = int(current_str) - except: + except Exception: current_version = 0 else: current_version = 0 - #Now, let’s upgrade the certificate storage if needed + # Now, let’s upgrade the certificate storage if needed while current_version < CERT_VERSION: current_version += 1 - upgrade_func = getattr(cert_migration,"upgrade_to_"+str(current_version)) + upgrade_func = getattr(cert_migration, "upgrade_to_" + str(current_version)) upgrade_func(data_folder, config_folder) - with open(version_path,"w") as f: + with open(version_path, "w") as f: f.write(str(current_version)) f.close() - CERT_UPGRADED=True - - + CERT_UPGRADED = True -#get xdg folder. Folder should be "cache", "data" or "config" +# get xdg folder. Folder should be "cache", "data" or "config" def xdg(folder="cache"): - ## Config directories - ## We implement our own python-xdg to avoid conflict with existing libraries. - _home = os.path.expanduser('~') - data_home = os.environ.get('XDG_DATA_HOME') or \ - os.path.join(_home,'.local','share') - config_home = os.environ.get('XDG_CONFIG_HOME') or \ - os.path.join(_home,'.config') - _CONFIG_DIR = os.path.join(os.path.expanduser(config_home),"offpunk/") - _DATA_DIR = os.path.join(os.path.expanduser(data_home),"offpunk/") + # Config directories + # We implement our own python-xdg to avoid conflict with existing libraries. + _home = os.path.expanduser("~") + data_home = os.environ.get("XDG_DATA_HOME") or os.path.join( + _home, ".local", "share" + ) + config_home = os.environ.get("XDG_CONFIG_HOME") or os.path.join(_home, ".config") + _CONFIG_DIR = os.path.join(os.path.expanduser(config_home), "offpunk/") + _DATA_DIR = os.path.join(os.path.expanduser(data_home), "offpunk/") _old_config = os.path.expanduser("~/.offpunk/") - ## Look for pre-existing config directory, if any + # Look for pre-existing config directory, if any if os.path.exists(_old_config): _CONFIG_DIR = _old_config - #if no XDG .local/share and not XDG .config, we use the old config + # if no XDG .local/share and not XDG .config, we use the old config if not os.path.exists(data_home) and os.path.exists(_old_config): _DATA_DIR = _CONFIG_DIR - ## get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable + # get _CACHE_PATH from OFFPUNK_CACHE_PATH environment variable # if OFFPUNK_CACHE_PATH empty, set default to ~/.cache/offpunk - cache_home = os.environ.get('XDG_CACHE_HOME') or\ - os.path.join(_home,'.cache') - _CACHE_PATH = os.environ.get('OFFPUNK_CACHE_PATH', \ - os.path.join(os.path.expanduser(cache_home),"offpunk/")) - #Check that the cache path ends with "/" + cache_home = os.environ.get("XDG_CACHE_HOME") or os.path.join(_home, ".cache") + _CACHE_PATH = os.environ.get( + "OFFPUNK_CACHE_PATH", os.path.join(os.path.expanduser(cache_home), "offpunk/") + ) + # Check that the cache path ends with "/" if not _CACHE_PATH.endswith("/"): _CACHE_PATH += "/" - os.makedirs(_CACHE_PATH,exist_ok=True) + os.makedirs(_CACHE_PATH, exist_ok=True) if folder == "cache" and not CACHE_UPGRADED: upgrade_cache(_CACHE_PATH) if folder == "cache": @@ -126,22 +133,21 @@ def xdg(folder="cache"): upgrade_cert(_CONFIG_DIR, _DATA_DIR) return _DATA_DIR else: - print("No XDG folder for %s. Check your code."%folder) + print("No XDG folder for %s. Check your code." % folder) return None - -#An IPV6 URL should be put between [] -#We try to detect them has location with more than 2 ":" +# An IPV6 URL should be put between [] +# We try to detect them has location with more than 2 ":" def fix_ipv6_url(url): if not url or url.startswith("mailto"): return url if "://" in url: - schema, schemaless = url.split("://",maxsplit=1) + schema, schemaless = url.split("://", maxsplit=1) else: schema, schemaless = None, url if "/" in schemaless: - netloc, rest = schemaless.split("/",1) + netloc, rest = schemaless.split("/", 1) if netloc.count(":") > 2 and "[" not in netloc and "]" not in netloc: schemaless = "[" + netloc + "]" + "/" + rest elif schemaless.count(":") > 2 and "[" not in schemaless and "]" not in schemaless: @@ -150,6 +156,7 @@ def fix_ipv6_url(url): return schema + "://" + schemaless return schemaless + # Cheap and cheerful URL detector def looks_like_url(word): try: @@ -157,44 +164,49 @@ def looks_like_url(word): return False url = fix_ipv6_url(word).strip() parsed = urllib.parse.urlparse(url) - #sometimes, urllib crashed only when requesting the port + # sometimes, urllib crashed only when requesting the port port = parsed.port scheme = word.split("://")[0] mailto = word.startswith("mailto:") start = scheme in netcache.standard_ports - local = scheme in ["file","list"] + local = scheme in ["file", "list"] if mailto: return "@" in word elif not local: if start: - #IPv4 + # IPv4 if "." in word or "localhost" in word: return True - #IPv6 + # IPv6 elif "[" in word and ":" in word and "]" in word: return True - else: return False - else: return False + else: + return False + else: + return False return start and ("." in word or "localhost" in word or ":" in word) else: return "/" in word except ValueError: return False -## Those two functions add/remove the mode to the + +# Those two functions add/remove the mode to the # URLs. This is a gross hack to remember the mode -def mode_url(url,mode): - if mode and mode!= "readable" and "##offpunk=" not in url: +def mode_url(url, mode): + if mode and mode != "readable" and "##offpunk=" not in url: url += "##offpunk_mode=" + mode return url + def unmode_url(url): mode = None splitted = url.split("##offpunk_mode=") if len(splitted) > 1: url = splitted[0] mode = splitted[1] - return [url,mode] + return [url, mode] + # In terms of arguments, this can take an input file/string to be passed to # stdin, a parameter to do (well-escaped) "%" replacement on the command, a @@ -214,9 +226,16 @@ def run(cmd, *, input=None, parameter=None, direct_output=False, env={}): stdin = None if not direct_output: # subprocess.check_output() wouldn't allow us to pass stdin. - result = subprocess.run(cmd, check=True, env=e, input=input, - shell=True, stdin=stdin, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + result = subprocess.run( + cmd, + check=True, + env=e, + input=input, + shell=True, + stdin=stdin, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) return result.stdout.decode() else: subprocess.run(cmd, env=e, input=input, shell=True, stdin=stdin) @@ -225,8 +244,9 @@ def run(cmd, *, input=None, parameter=None, direct_output=False, env={}): global TERM_WIDTH TERM_WIDTH = 72 -#if absolute, returns the real terminal width, not the text width -def term_width(new_width=None,absolute=False): + +# if absolute, returns the real terminal width, not the text width +def term_width(new_width=None, absolute=False): if new_width: global TERM_WIDTH TERM_WIDTH = new_width @@ -238,24 +258,26 @@ def term_width(new_width=None,absolute=False): width = cur return width + def is_local(url): - if not url: return True + if not url: + return True elif "://" in url: - scheme,path = url.split("://",maxsplit=1) - return scheme in ["file","mail","list","mailto"] + scheme, path = url.split("://", maxsplit=1) + return scheme in ["file", "mail", "list", "mailto"] else: return True # This method return the image URL or invent it if it’s a base64 inline image # It returns [url,image_data] where image_data is None for normal image -def looks_like_base64(src,baseurl): +def looks_like_base64(src, baseurl): imgdata = None imgname = src if src and src.startswith("data:image/"): if ";base64," in src: splitted = src.split(";base64,") - #splitted[0] is something like data:image/jpg + # splitted[0] is something like data:image/jpg if "/" in splitted[0]: extension = splitted[0].split("/")[1] else: @@ -264,8 +286,8 @@ def looks_like_base64(src,baseurl): imgname = imgdata[:20] + "." + extension imgurl = urllib.parse.urljoin(baseurl, imgname) else: - #We can’t handle other data:image such as svg for now + # We can’t handle other data:image such as svg for now imgurl = None else: imgurl = urllib.parse.urljoin(baseurl, imgname) - return imgurl,imgdata + return imgurl, imgdata diff --git a/opnk.py b/opnk.py index 4e80f0e..c614607 100755 --- a/opnk.py +++ b/opnk.py @@ -1,27 +1,30 @@ #!/usr/bin/env python3 -#opnk stand for "Open like a PuNK". -#It will open any file or URL and display it nicely in less. -#If not possible, it will fallback to xdg-open -#URL are retrieved through netcache +# opnk stand for "Open like a PuNK". +# It will open any file or URL and display it nicely in less. +# If not possible, it will fallback to xdg-open +# URL are retrieved through netcache +import argparse +import fnmatch import os +import shutil import sys import tempfile -import argparse -import netcache +import time + import ansicat +import netcache import offutils -import shutil -import time -import fnmatch -from offutils import run,term_width,mode_url,unmode_url,is_local,GREPCMD +from offutils import GREPCMD, is_local, mode_url, run, term_width, unmode_url -_HAS_XDGOPEN = shutil.which('xdg-open') +_HAS_XDGOPEN = shutil.which("xdg-open") less_version = 0 if not shutil.which("less"): - print("Please install the pager \"less\" to run Offpunk.") + print('Please install the pager "less" to run Offpunk.') print("If you wish to use another pager, send me an email !") - print("(I’m really curious to hear about people not having \"less\" on their system.)") + print( + '(I’m really curious to hear about people not having "less" on their system.)' + ) sys.exit() output = run("less --version") # We get less Version (which is the only integer on the first line) @@ -36,7 +39,9 @@ if less_version >= 572: _LESS_RESTORE_POSITION = True else: _LESS_RESTORE_POSITION = False -#_DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s" + + +# _DEFAULT_LESS = "less -EXFRfM -PMurl\ lines\ \%lt-\%lb/\%L\ \%Pb\%$ %s" # -E : quit when reaching end of file (to behave like "cat") # -F : quit if content fits the screen (behave like "cat") # -X : does not clear the screen @@ -47,11 +52,11 @@ else: # -i : ignore case in search # -S : do not wrap long lines. Wrapping is done by offpunk, longlines # are there on purpose (surch in asciiart) -#--incsearch : incremental search starting rev581 -def less_cmd(file, histfile=None,cat=False,grep=None): +# --incsearch : incremental search starting rev581 +def less_cmd(file, histfile=None, cat=False, grep=None): less_prompt = "page %%d/%%D- lines %%lb/%%L - %%Pb\\%%" if less_version >= 581: - less_base = "less --incsearch --save-marks -~ -XRfWiS -P \"%s\""%less_prompt + less_base = 'less --incsearch --save-marks -~ -XRfWiS -P "%s"' % less_prompt elif less_version >= 572: less_base = "less --save-marks -XRfMWiS" else: @@ -66,21 +71,22 @@ def less_cmd(file, histfile=None,cat=False,grep=None): cmd_str = _DEFAULT_CAT elif grep: grep_cmd = GREPCMD - #case insensitive for lowercase search + # case insensitive for lowercase search if grep.islower(): grep_cmd += " -i" - cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s"%grep + cmd_str = _DEFAULT_CAT + "|" + grep_cmd + " %s" % grep else: cmd_str = _DEFAULT_LESS run(cmd_str, parameter=file, direct_output=True, env=env) -class opencache(): + +class opencache: def __init__(self): # We have a cache of the rendering of file and, for each one, # a less_histfile containing the current position in the file self.temp_files = {} self.less_histfile = {} - # This dictionary contains an url -> ansirenderer mapping. This allows + # This dictionary contains an url -> ansirenderer mapping. This allows # to reuse a renderer when visiting several times the same URL during # the same session # We save the time at which the renderer was created in renderer_time @@ -110,13 +116,13 @@ class opencache(): if _HAS_XDGOPEN: cmd_str = "xdg-open %s" else: - cmd_str = "echo \"Can’t find how to open \"%s" + cmd_str = 'echo "Can’t find how to open "%s' print("Please install xdg-open (usually from xdg-util package)") return cmd_str # Return the handler for a specific mimetype. # Return the whole dic if no specific mime provided - def get_handlers(self,mime=None): + def get_handlers(self, mime=None): if mime and mime in self.mime_handlers.keys(): return self.mime_handlers[mime] elif mime: @@ -124,32 +130,35 @@ class opencache(): else: return self.mime_handlers - def set_handler(self,mime,handler): + def set_handler(self, mime, handler): previous = None if mime in self.mime_handlers.keys(): previous = self.mime_handlers[mime] self.mime_handlers[mime] = handler if "%s" not in handler: - print("WARNING: this handler has no %%s, no filename will be provided to the command") + print( + "WARNING: this handler has no %%s, no filename will be provided to the command" + ) if previous: - print("Previous handler was %s"%previous) + print("Previous handler was %s" % previous) - def get_renderer(self,inpath,mode=None,theme=None): + def get_renderer(self, inpath, mode=None, theme=None): # We remove the ##offpunk_mode= from the URL # If mode is already set, we don’t use the part from the URL - inpath,newmode = unmode_url(inpath) - if not mode: mode = newmode + inpath, newmode = unmode_url(inpath) + if not mode: + mode = newmode # If we still doesn’t have a mode, we see if we used one before if not mode and inpath in self.last_mode.keys(): mode = self.last_mode[inpath] elif not mode: - #default mode is readable + # default mode is readable mode = "readable" renderer = None path = netcache.get_cache_path(inpath) if path: usecache = inpath in self.rendererdic.keys() and not is_local(inpath) - #Screen size may have changed + # Screen size may have changed width = term_width(absolute=True) if usecache and self.last_width != width: self.cleanup() @@ -166,7 +175,7 @@ class opencache(): else: usecache = False if not usecache: - renderer = ansicat.renderer_from_file(path,url=inpath,theme=theme) + renderer = ansicat.renderer_from_file(path, url=inpath, theme=theme) if renderer: self.rendererdic[inpath] = renderer self.renderer_time[inpath] = int(time.time()) @@ -174,55 +183,55 @@ class opencache(): renderer = self.rendererdic[inpath] return renderer - def get_temp_filename(self,url): + def get_temp_filename(self, url): if url in self.temp_files.keys(): return self.temp_files[url] else: return None - def opnk(self,inpath,mode=None,terminal=True,grep=None,theme=None,**kwargs): - #Return True if inpath opened in Terminal + def opnk(self, inpath, mode=None, terminal=True, grep=None, theme=None, **kwargs): + # Return True if inpath opened in Terminal # False otherwise # also returns the url in case it has been modified - #if terminal = False, we don’t try to open in the terminal, - #we immediately fallback to xdg-open. - #netcache currently provide the path if it’s a file. + # if terminal = False, we don’t try to open in the terminal, + # we immediately fallback to xdg-open. + # netcache currently provide the path if it’s a file. if not offutils.is_local(inpath): kwargs["images_mode"] = mode - cachepath,inpath = netcache.fetch(inpath,**kwargs) + cachepath, inpath = netcache.fetch(inpath, **kwargs) if not cachepath: return False, inpath # folowing line is for :// which are locals (file,list) elif "://" in inpath: - cachepath,inpath = netcache.fetch(inpath,**kwargs) + cachepath, inpath = netcache.fetch(inpath, **kwargs) elif inpath.startswith("mailto:"): cachepath = inpath elif os.path.exists(inpath): cachepath = inpath else: - print("%s does not exist"%inpath) + print("%s does not exist" % inpath) return False, inpath - renderer = self.get_renderer(inpath,mode=mode,theme=theme) + renderer = self.get_renderer(inpath, mode=mode, theme=theme) if renderer and mode: renderer.set_mode(mode) self.last_mode[inpath] = mode if not mode and inpath in self.last_mode.keys(): mode = self.last_mode[inpath] renderer.set_mode(mode) - #we use the full moded url as key for the dictionary - key = mode_url(inpath,mode) + # we use the full moded url as key for the dictionary + key = mode_url(inpath, mode) if terminal and renderer: - #If this is an image and we have chafa/timg, we - #don’t use less, we call it directly + # If this is an image and we have chafa/timg, we + # don’t use less, we call it directly if renderer.has_direct_display(): - renderer.display(mode=mode,directdisplay=True) + renderer.display(mode=mode, directdisplay=True) return True, inpath else: body = renderer.display(mode=mode) - #Should we use the cache ? only if it is not local and there’s a cache + # Should we use the cache ? only if it is not local and there’s a cache usecache = key in self.temp_files and not is_local(inpath) if usecache: - #and the cache is still valid! + # and the cache is still valid! last_downloaded = netcache.cache_last_modified(inpath) last_cached = os.path.getmtime(self.temp_files[key]) if last_downloaded > last_cached: @@ -231,42 +240,57 @@ class opencache(): self.less_histfile.pop(key) # We actually put the body in a tmpfile before giving it to less if not usecache: - tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False) + tmpf = tempfile.NamedTemporaryFile( + "w", encoding="UTF-8", delete=False + ) self.temp_files[key] = tmpf.name tmpf.write(body) tmpf.close() if key not in self.less_histfile: firsttime = True - tmpf = tempfile.NamedTemporaryFile("w", encoding="UTF-8", delete=False) + tmpf = tempfile.NamedTemporaryFile( + "w", encoding="UTF-8", delete=False + ) self.less_histfile[key] = tmpf.name else: - #We don’t want to restore positions in lists + # We don’t want to restore positions in lists firsttime = is_local(inpath) - less_cmd(self.temp_files[key], histfile=self.less_histfile[key],cat=firsttime,grep=grep) + less_cmd( + self.temp_files[key], + histfile=self.less_histfile[key], + cat=firsttime, + grep=grep, + ) return True, inpath - #maybe, we have no renderer. Or we want to skip it. + # maybe, we have no renderer. Or we want to skip it. else: mimetype = ansicat.get_mime(cachepath) if mimetype == "mailto": mail = inpath[7:] - resp = input("Send an email to %s Y/N? " %mail) + resp = input("Send an email to %s Y/N? " % mail) if resp.strip().lower() in ("y", "yes"): - if _HAS_XDGOPEN : - run("xdg-open mailto:%s", parameter=mail,direct_output=True) + if _HAS_XDGOPEN: + run("xdg-open mailto:%s", parameter=mail, direct_output=True) else: - print("Cannot find a mail client to send mail to %s" %inpath) - print("Please install xdg-open (usually from xdg-util package)") + print("Cannot find a mail client to send mail to %s" % inpath) + print("Please install xdg-open (usually from xdg-util package)") return False, inpath else: cmd_str = self._get_handler_cmd(mimetype) try: - run(cmd_str, parameter=netcache.get_cache_path(inpath), direct_output=True) + run( + cmd_str, + parameter=netcache.get_cache_path(inpath), + direct_output=True, + ) except FileNotFoundError: print("Handler program %s not found!" % shlex.split(cmd_str)[0]) - print("You can use the ! command to specify another handler program or pipeline.") + print( + "You can use the ! command to specify another handler program or pipeline." + ) return False, inpath - #We remove the renderers from the cache and we also delete temp files + # We remove the renderers from the cache and we also delete temp files def cleanup(self): while len(self.temp_files) > 0: os.remove(self.temp_files.popitem()[1]) @@ -276,25 +300,39 @@ class opencache(): self.rendererdic = {} self.renderer_time = {} self.last_mode = {} - + + def main(): descri = "opnk is an universal open command tool that will try to display any file \ in the pager less after rendering its content with ansicat. If that fails, \ opnk will fallback to opening the file with xdg-open. If given an URL as input \ instead of a path, opnk will rely on netcache to get the networked content." - parser = argparse.ArgumentParser(prog="opnk",description=descri) - parser.add_argument("--mode", metavar="MODE", - help="Which mode should be used to render: normal (default), full or source.\ - With HTML, the normal mode try to extract the article.") - parser.add_argument("content",metavar="INPUT", nargs="*", - default=sys.stdin, help="Path to the file or URL to open") - parser.add_argument("--cache-validity",type=int, default=0, - help="maximum age, in second, of the cached version before \ - redownloading a new version") + parser = argparse.ArgumentParser(prog="opnk", description=descri) + parser.add_argument( + "--mode", + metavar="MODE", + help="Which mode should be used to render: normal (default), full or source.\ + With HTML, the normal mode try to extract the article.", + ) + parser.add_argument( + "content", + metavar="INPUT", + nargs="*", + default=sys.stdin, + help="Path to the file or URL to open", + ) + parser.add_argument( + "--cache-validity", + type=int, + default=0, + help="maximum age, in second, of the cached version before \ + redownloading a new version", + ) args = parser.parse_args() cache = opencache() for f in args.content: - cache.opnk(f,mode=args.mode,validity=args.cache_validity) + cache.opnk(f, mode=args.mode, validity=args.cache_validity) + if __name__ == "__main__": main() diff --git a/pyproject.toml b/pyproject.toml index ea7c75c..2b8af81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ only-include = [ "offutils.py", "opnk.py", "cert_migration.py", ] -[tool.ruff] +[tool.ruff.lint] select = ["E"] # Never enforce `E501` (line length violations) diff --git a/tests/geminiclient_test.py b/tests/geminiclient_test.py index d3c0f2a..7c83150 100644 --- a/tests/geminiclient_test.py +++ b/tests/geminiclient_test.py @@ -1,5 +1,5 @@ -from offpunk import GeminiClient import offthemes +from offpunk import GeminiClient def test_set_prompt(): diff --git a/tutorial/make_website.py b/tutorial/make_website.py index bd32451..320bb3b 100644 --- a/tutorial/make_website.py +++ b/tutorial/make_website.py @@ -1,7 +1,7 @@ #!/bin/python +import html import os import unicodedata -import html from datetime import datetime baseurl = "offpunk.net" -- 2.48.1