~witcher/public-inbox

Implement async fetching v1 APPLIED

Hugo Osvaldo Barrera: 1
 Implement async fetching

 4 files changed, 47 insertions(+), 31 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~witcher/public-inbox/patches/36826/mbox | git am -3
Learn more about email & git

[PATCH] Implement async fetching Export this patch

This changeset implements fetching of RSS feeds asynchronously. It's
still no perfect though: data from feeds is inserted into the database
synchronously, and requests are temporarily paused while this happens
(since we're doing blocking IO to the database).

A potential solution to this is to have one thread that does the
networking, and another which inserts into the DB, with something like
an mpsc::channel to pass messages between them. This requires further
refactor and can be done as a followup. It would also be great to re-use
a single reqwest::Client, but that also requires further refactors.

Despite imperfections, this patch still provides a noticeable speed
bump; networking IO for one feed does not block others.

References: https://todo.sr.ht/~witcher/rss-email/3
---
 Cargo.lock  | 51 +++++++++++++++++++++++++++------------------------
 Cargo.toml  |  3 ++-
 src/main.rs | 18 ++++++++++++++----
 src/rss.rs  |  6 ++++--
 4 files changed, 47 insertions(+), 31 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5e5ded6..0636839 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -350,50 +350,41 @@ dependencies = [

[[package]]
name = "futures-channel"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
 "futures-core",
]

[[package]]
name = "futures-core"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115"

[[package]]
name = "futures-io"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"

[[package]]
name = "futures-sink"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"

[[package]]
name = "futures-task"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"

[[package]]
name = "futures-util"
version = "0.3.23"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
 "futures-core",
 "futures-io",
 "futures-task",
 "memchr",
 "pin-project-lite",
 "pin-utils",
 "slab",
]

[[package]]
@@ -741,9 +732,9 @@ dependencies = [

[[package]]
name = "num_cpus"
version = "1.13.1"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
dependencies = [
 "hermit-abi",
 "libc",
@@ -962,6 +953,7 @@ dependencies = [
 "reqwest",
 "rss",
 "serde",
 "tokio",
 "toml",
]

@@ -1150,9 +1142,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"

[[package]]
name = "tokio"
version = "1.20.1"
version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
 "autocfg",
 "bytes",
@@ -1160,12 +1152,23 @@ dependencies = [
 "memchr",
 "mio",
 "num_cpus",
 "once_cell",
 "pin-project-lite",
 "socket2",
 "tokio-macros",
 "winapi",
]

[[package]]
name = "tokio-macros"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
]

[[package]]
name = "tokio-rustls"
version = "0.23.4"
diff --git a/Cargo.toml b/Cargo.toml
index 93f7b32..dacdd82 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,7 +11,7 @@ strip = "symbols"
diesel = { version = "1.4", features = ["sqlite"] }
rss = "2.0"
anyhow = "1.0"
reqwest = {version = "0.11", default-features = false, features = ["blocking", "rustls-tls"]}
reqwest = {version = "0.11", default-features = false, features = ["rustls-tls"]}
clap = { version = "3", features = ["derive"] }
chrono = "0.4"
toml = "0.5.8"
@@ -21,3 +21,4 @@ diesel_migrations = "1.4.0"
directories = "4.0.1"
log = "0.4.17"
env_logger = "0.9.0"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] }
diff --git a/src/main.rs b/src/main.rs
index 156cc10..b181138 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -24,8 +24,10 @@ use std::{
    fs::File,
    io::{BufRead, BufReader},
};
use tokio::task::JoinSet;

fn main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    diesel_migrations::embed_migrations!("migrations/");
    env_logger::init();

@@ -49,10 +51,18 @@ fn main() -> anyhow::Result<()> {
    let conn = SqliteConnection::establish(args.database_path.unwrap().to_str().unwrap())?;
    embedded_migrations::run(&conn)?;

    let mut set = JoinSet::new();
    for u in urls {
        debug!("Fetching feed from {u:?}");
        let new = rss::fetch_new(u)?;
        for i in new.items() {
        set.spawn(async move { rss::fetch_new(u).await });
    }

    while let Some(new) = set.join_next().await {
        let new = new??;
        let items = new.items();

        debug!("Found {} new items", items.len());

        for i in items {
            let _ = db::insert_item(&conn, i)?;
        }
    }
diff --git a/src/rss.rs b/src/rss.rs
index 5290366..af377d1 100644
--- a/src/rss.rs
+++ b/src/rss.rs
@@ -1,8 +1,10 @@
use reqwest;
use rss;

pub fn fetch_new<S: AsRef<str>>(url: S) -> anyhow::Result<rss::Channel> {
    let content = reqwest::blocking::get(url.as_ref())?.bytes()?;
pub async fn fetch_new<S: AsRef<str>>(url: S) -> anyhow::Result<rss::Channel> {
    debug!("Fetching feed for {}", url.as_ref());

    let content = reqwest::get(url.as_ref()).await?.bytes().await?;
    let channel = rss::Channel::read_from(&content[..])?;

    Ok(channel)
-- 
2.38.1
Looks good, applied it. Thank you so much!