In preparation for new timeline features, clean up api.timeline and
makes things more consistent.
The new function fetch_timeline() is now always used to do the actual
HTTP fetch. This implements the 'next_path' functionality.
Add a new StatusTimeline while is the base class for timelines that
return statuses (HomeTimeline, etc).
Clean up the 'limit' parameter; it's now always 'int | None = None'
except in fetch_timeline(), which sets the actual limit if not
specified.
Cleanup/fix a couple of Timelines to make sure they all use
fetch_timeline().
Comment out a couple of *_timline_generator() functions which weren't
used anywhere.
No functional changes intended. The external API (e.g. as used by
TimelineTab) should be unchanged.
---
tooi/api/timeline.py | 191 +++++++++++++++++++++++--------------------
1 file changed, 104 insertions(+), 87 deletions(-)
diff --git a/tooi/api/timeline.py b/tooi/api/timeline.py
index 5db03f3..42f8ee6 100644
--- a/tooi/api/timeline.py
+++ b/tooi/api/timeline.py
@@ -16,26 +16,44 @@ from tooi.data.events import Event, StatusEvent, MentionEvent, NewFollowerEvent,
from tooi.data.events import FavouriteEvent
from tooi.data.instance import InstanceInfo
from tooi.entities import Status, Notification, from_dict
-from tooi.utils.string import str_bool
Params = Optional[QueryParamTypes]
EventGenerator = AsyncGenerator[List[Event], None]
-# def anon_public_timeline_generator(instance, local: bool = False, limit: int = 40):
-# path = "/api/v1/timelines/public"
-# params = {"local": str_bool(local), "limit": limit}
-# return _anon_timeline_generator(instance, path, params)
+# Max 80, as of Mastodon 4.1.0
+DEFAULT_LIMIT = 40
-# def anon_tag_timeline_generator(instance, hashtag, local: bool = False, limit: int = 40):
-# path = f"/api/v1/timelines/tag/{quote(hashtag)}"
-# params = {"local": str_bool(local), "limit": limit}
-# return _anon_timeline_generator(instance, path, params)
+
+def _get_next_path(headers: Headers) -> str | None:
+ """Given timeline response headers, returns the path to the next batch"""
+ links = headers.get("Link", "")
+ matches = re.match(r'<([^>]+)>; rel="next"', links)
+ if matches:
+ parsed = urlparse(matches.group(1))
+ return "?".join([parsed.path, parsed.query])
+ return None
+
+
+async def fetch_timeline(
+ instance: InstanceInfo,
+ path: str,
+ params: Params | None = None,
+ limit: int | None = None):
+
+ _params = dict(params or {})
+ _params["limit"] = limit or DEFAULT_LIMIT
+
+ next_path = path
+ while next_path:
+ response = await request("GET", next_path, params=_params)
+ yield response.json()
+ next_path = _get_next_path(response.headers)
class Timeline(ABC):
"""
- Base class for a timeline.
+ Base class for a timeline. This provides some useful generators that subclasses can use.
"""
def __init__(self, name, instance: InstanceInfo):
@@ -43,21 +61,29 @@ class Timeline(ABC):
self.name = name
@abstractmethod
- def create_generator(self, limit: int = 40) -> EventGenerator:
+ def create_generator(self, limit: int | None = None) -> EventGenerator:
...
-async def _status_generator(
- instance: InstanceInfo,
- path: str, params: Params = None) -> EventGenerator:
- next_path = path
- while next_path:
- response = await request("GET", next_path, params=params)
- yield [StatusEvent(instance, from_dict(Status, s)) for s in response.json()]
- next_path = _get_next_path(response.headers)
+class StatusTimeline(Timeline):
+ """
+ StatusTimeline is the base class for timelines which only return statuses.
+ """
+ def __init__(self, name: str, instance: InstanceInfo):
+ super().__init__(name, instance)
-class HomeTimeline(Timeline):
+ async def status_generator(
+ self,
+ path: str,
+ params: Params = None,
+ limit: int | None = None) -> EventGenerator:
+
+ async for events in fetch_timeline(self.instance, path, params, limit):
+ yield [StatusEvent(self.instance, from_dict(Status, s)) for s in events]
+
+
+class HomeTimeline(StatusTimeline):
"""
HomeTimeline loads events from the user's home timeline.
This timeline only ever returns events of type StatusEvent.
@@ -66,32 +92,42 @@ class HomeTimeline(Timeline):
def __init__(self, instance: InstanceInfo):
super().__init__("Home", instance)
- def create_generator(self, limit: int = 40):
- return _status_generator(self.instance, "/api/v1/timelines/home", {"limit": limit})
+ def create_generator(self, limit: int | None = None):
+ return self.status_generator("/api/v1/timelines/home", limit=limit)
-class LocalTimeline(Timeline):
+class PublicTimeline(StatusTimeline):
+ """PublicTimeline loads events from the public timeline."""
+ def __init__(self, name: str, instance: InstanceInfo, local: bool):
+ super().__init__(name, instance)
+ self.local = local
+
+ def public_timeline_generator(self, limit: int | None = None):
+ return self.status_generator("/api/v1/timelines/public", {"local": self.local}, limit)
+
+
+class LocalTimeline(PublicTimeline):
"""
LocalTimeline loads events from the user's local instance timeline.
This timeline only ever returns events of type StatusEvent.
"""
def __init__(self, instance: InstanceInfo):
- super().__init__("Local", instance)
+ super().__init__("Local", instance, True)
- def create_generator(self, limit: int = 40):
- return public_timeline_generator(self.instance, local=True, limit=limit)
+ def create_generator(self, limit: int | None = None):
+ return self.public_timeline_generator(limit=limit)
-class FederatedTimeline(Timeline):
+class FederatedTimeline(PublicTimeline):
"""
FederatedTimeline loads events from the user's federated timeline.
This timeline only ever returns events of type StatusEvent.
"""
def __init__(self, instance: InstanceInfo):
- super().__init__("Federated", instance)
+ super().__init__("Federated", instance, False)
- def create_generator(self, limit: int = 40):
- return public_timeline_generator(self.instance, local=False, limit=limit)
+ def create_generator(self, limit: int | None = None):
+ return self.public_timeline_generator(limit=limit)
class NotificationTimeline(Timeline):
@@ -117,23 +153,24 @@ class NotificationTimeline(Timeline):
case _:
return None
- async def _notification_generator(self, params: Params = None) -> EventGenerator:
- path = "/api/v1/notifications"
- next_path = path
+ async def notification_generator(
+ self,
+ path: str,
+ params: Params = None,
+ limit: int | None = None) -> EventGenerator:
- while next_path:
- response = await request("GET", next_path, params=params)
- yield [e for e in map(self.make_notification_event, response.json()) if e is not None]
- next_path = _get_next_path(response.headers)
+ path = "/api/v1/notifications"
+ async for events in fetch_timeline(self.instance, path, params, limit):
+ yield [e for e in map(self.make_notification_event, events) if e is not None]
- def create_generator(self, limit: int = 40):
+ def create_generator(self, limit: int | None = None):
# TODO: not included: follow_request, poll, update, admin.sign_up, admin.report
types = ["mention", "status", "reblog", "favourite", "follow"]
- params = {"types[]": types, "limit": limit}
- return self._notification_generator(params)
+ params = {"types[]": types}
+ return self.notification_generator(params, limit)
-class TagTimeline(Timeline):
+class TagTimeline(StatusTimeline):
"""
TagTimeline loads events from the given hashtag.
This timeline only ever returns events of type StatusEvent.
@@ -142,7 +179,7 @@ class TagTimeline(Timeline):
self.local = local
# Normalise the hashtag to not begin with a hash
- if hashtag[0] == '#':
+ if hashtag[0:1] == '#':
hashtag = hashtag[1:]
if len(hashtag) == 0:
@@ -152,7 +189,8 @@ class TagTimeline(Timeline):
super().__init__(f"#{self.hashtag}", instance)
def create_generator(self, limit: int = 40):
- return tag_timeline_generator(self.instance, self.hashtag, self.local, limit)
+ path = f"/api/v1/timelines/tag/{quote(self.hashtag)}"
+ return self.status_generator(path, limit=limit)
class ContextTimeline(Timeline):
@@ -164,38 +202,22 @@ class ContextTimeline(Timeline):
super().__init__("Thread", instance)
self._status = status
- def create_generator(self, limit: int = 40):
- async def _context_timeline_generator(status: Status, limit: int = 40):
- response = await statuses.context(status.original.id)
- data = response.json()
- ancestors = [from_dict(Status, s) for s in data["ancestors"]]
- descendants = [from_dict(Status, s) for s in data["descendants"]]
- all_statuses = ancestors + [status] + descendants
- yield [StatusEvent(self.instance, s) for s in all_statuses]
-
- return _context_timeline_generator(self._status, limit)
-
+ async def context_timeline_generator(self, status: Status, limit: int | None = None):
+ response = await statuses.context(status.original.id)
+ data = response.json()
+ ancestors = [from_dict(Status, s) for s in data["ancestors"]]
+ descendants = [from_dict(Status, s) for s in data["descendants"]]
+ all_statuses = ancestors + [status] + descendants
+ yield [StatusEvent(self.instance, s) for s in all_statuses]
-def public_timeline_generator(instance: InstanceInfo, local: bool = False, limit: int = 40):
- path = "/api/v1/timelines/public"
- params = {"local": str_bool(local), "limit": limit}
- return _status_generator(instance, path, params)
+ def create_generator(self, limit: int | None = None):
+ return self.context_timeline_generator(self._status, limit)
-def tag_timeline_generator(
- instance: InstanceInfo,
- hashtag: str,
- local: bool = False,
- limit: int = 40):
- path = f"/api/v1/timelines/tag/{quote(hashtag)}"
- params = {"local": str_bool(local), "limit": limit}
- return _status_generator(instance, path, params)
-
-
-def bookmark_timeline_generator(instance: InstanceInfo, limit: int = 40):
- path = "/api/v1/bookmarks"
- params = {"limit": limit}
- return _status_generator(instance, path, params)
+# def bookmark_timeline_generator(instance: InstanceInfo, limit: int = 40):
+# path = "/api/v1/bookmarks"
+# params = {"limit": limit}
+# return _status_generator(instance, path, params)
# def conversation_timeline_generator(limit: int = 40):
@@ -204,13 +226,6 @@ def bookmark_timeline_generator(instance: InstanceInfo, limit: int = 40):
# return _conversation_timeline_generator(path, params)
-# def account_timeline_generator(account_name: str, replies=False, reblogs=False, limit: int = 40):
-# account = await find_account(account_name)
-# path = f"/api/v1/accounts/{account["id"]}/statuses"
-# params = {"limit": limit, "exclude_replies": not replies, "exclude_reblogs": not reblogs}
-# return _timeline_generator(path, params)
-
-
# def list_timeline_generator(list_id: str, limit: int = 20):
# path = f"/api/v1/timelines/list/{list_id}"
# return _timeline_generator(path, {"limit": limit})
@@ -236,11 +251,13 @@ def bookmark_timeline_generator(instance: InstanceInfo, limit: int = 40):
# next_path = _get_next_path(response.headers)
-def _get_next_path(headers: Headers) -> str | None:
- """Given timeline response headers, returns the path to the next batch"""
- links = headers.get("Link", "")
- matches = re.match(r'<([^>]+)>; rel="next"', links)
- if matches:
- parsed = urlparse(matches.group(1))
- return "?".join([parsed.path, parsed.query])
- return None
+# def anon_public_timeline_generator(instance, local: bool = False, limit: int = 40):
+# path = "/api/v1/timelines/public"
+# params = {"local": str_bool(local), "limit": limit}
+# return _anon_timeline_generator(instance, path, params)
+
+
+# def anon_tag_timeline_generator(instance, hashtag, local: bool = False, limit: int = 40):
+# path = f"/api/v1/timelines/tag/{quote(hashtag)}"
+# params = {"local": str_bool(local), "limit": limit}
+# return _anon_timeline_generator(instance, path, params)
--
2.43.0