From 16f369d73a6db2732b0f76daff6f6f934d255b8a Mon Sep 17 00:00:00 2001 From: theBreadCompany Date: Mon, 3 Feb 2025 19:36:29 +0100 Subject: [PATCH] async hell --- datafetcher/src/main.rs | 93 +++++++++++-------- .../input/{ => xslt_dm_request}/10024717.json | 0 .../input/{ => xslt_dm_request}/24016923.json | 0 .../input/{ => xslt_dm_request}/25000001.json | 0 .../input/{ => xslt_dm_request}/25205074.json | 0 .../input/{ => xslt_dm_request}/31201864.json | 0 .../input/{ => xslt_dm_request}/6002417.json | 0 .../input/{ => xslt_dm_request}/71007418.json | 0 .../input/{ => xslt_dm_request}/80021000.json | 0 .../input/{ => xslt_dm_request}/80040013.json | 0 .../input/{ => xslt_dm_request}/94300928.json | 0 dataworker/src/main.rs | 2 +- 12 files changed, 54 insertions(+), 41 deletions(-) rename dataworker/input/{ => xslt_dm_request}/10024717.json (100%) rename dataworker/input/{ => xslt_dm_request}/24016923.json (100%) rename dataworker/input/{ => xslt_dm_request}/25000001.json (100%) rename dataworker/input/{ => xslt_dm_request}/25205074.json (100%) rename dataworker/input/{ => xslt_dm_request}/31201864.json (100%) rename dataworker/input/{ => xslt_dm_request}/6002417.json (100%) rename dataworker/input/{ => xslt_dm_request}/71007418.json (100%) rename dataworker/input/{ => xslt_dm_request}/80021000.json (100%) rename dataworker/input/{ => xslt_dm_request}/80040013.json (100%) rename dataworker/input/{ => xslt_dm_request}/94300928.json (100%) diff --git a/datafetcher/src/main.rs b/datafetcher/src/main.rs index a0fff6f..3e4cec2 100644 --- a/datafetcher/src/main.rs +++ b/datafetcher/src/main.rs @@ -6,7 +6,8 @@ use sqlx::postgres::PgPoolOptions; use dotenvy::dotenv; use std::env; use std::io::Write; -use tokio::time::{self, Duration}; +use tokio::task; +use tokio::time::{self, sleep, Duration}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -27,52 +28,64 @@ async fn main() -> Result<(), Box> { let mut count = 0; let client = reqwest::Client::new(); - let mut interval = time::interval(Duration::from_secs(1800)); + //let mut interval = time::interval(Duration::from_secs(1800)); loop { - print!("\nWaiting for next interval tick... "); + let mut handles = vec![]; std::io::stdout().flush().unwrap(); - interval.tick().await; + //interval.tick().await; let date = Local::now(); - println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M")); - for station in &stations { - count += 1; - let req = req - .replace("TEMPLATE_ID", &station) - .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) - .replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); - print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); - std::io::stdout().flush().unwrap(); - if let Ok(doc) = client - .get(req.replace("TEMPLATE_DEPARR", "arr")) - .header("Authorization", &token) - .send() - .await? - .json::() - .await - { - match insert_document(doc, &mut conn).await { - Ok(_) => {} - Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + //println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M")); + for station in &stations { + let station = station.clone(); + let req = req.clone(); + let client = client.clone(); + let mut conn = conn.clone(); + handles.push(task::spawn(async move { + count += 1; + let req = req + .replace("TEMPLATE_ID", &station) + .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) + .replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); + print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); + std::io::stdout().flush().unwrap(); + if let Ok(doc) = client + .get(req.replace("TEMPLATE_DEPARR", "arr")) + //.header("Authorization", &token) + .send() + .await + .unwrap() + .json::() + .await + { + match insert_document(doc, &mut conn).await { + Ok(_) => {} + Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + } + } else { + println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); } - } else { - println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); - } - if let Ok(doc) = client - .get(req.replace("TEMPLATE_DEPARR", "dep")) - .header("Authorization", &token) - .send() - .await? - .json::() - .await - { - match insert_document(doc, &mut conn).await { - Ok(_) => {} - Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + if let Ok(doc) = client + .get(req.replace("TEMPLATE_DEPARR", "dep")) + //.header("Authorization", &token) + .send() + .await + .unwrap() + .json::() + .await + { + match insert_document(doc, &mut conn).await { + Ok(_) => {} + Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + } + } else { + println!("\x1B[2K\rFailed to fetch departures for station {}", station); } - } else { - println!("\x1B[2K\rFailed to fetch departures for station {}", station); + })); } + for handle in handles { + handle.await?; + sleep(Duration::from_millis(100)).await; } count = 0; } diff --git a/dataworker/input/10024717.json b/dataworker/input/xslt_dm_request/10024717.json similarity index 100% rename from dataworker/input/10024717.json rename to dataworker/input/xslt_dm_request/10024717.json diff --git a/dataworker/input/24016923.json b/dataworker/input/xslt_dm_request/24016923.json similarity index 100% rename from dataworker/input/24016923.json rename to dataworker/input/xslt_dm_request/24016923.json diff --git a/dataworker/input/25000001.json b/dataworker/input/xslt_dm_request/25000001.json similarity index 100% rename from dataworker/input/25000001.json rename to dataworker/input/xslt_dm_request/25000001.json diff --git a/dataworker/input/25205074.json b/dataworker/input/xslt_dm_request/25205074.json similarity index 100% rename from dataworker/input/25205074.json rename to dataworker/input/xslt_dm_request/25205074.json diff --git a/dataworker/input/31201864.json b/dataworker/input/xslt_dm_request/31201864.json similarity index 100% rename from dataworker/input/31201864.json rename to dataworker/input/xslt_dm_request/31201864.json diff --git a/dataworker/input/6002417.json b/dataworker/input/xslt_dm_request/6002417.json similarity index 100% rename from dataworker/input/6002417.json rename to dataworker/input/xslt_dm_request/6002417.json diff --git a/dataworker/input/71007418.json b/dataworker/input/xslt_dm_request/71007418.json similarity index 100% rename from dataworker/input/71007418.json rename to dataworker/input/xslt_dm_request/71007418.json diff --git a/dataworker/input/80021000.json b/dataworker/input/xslt_dm_request/80021000.json similarity index 100% rename from dataworker/input/80021000.json rename to dataworker/input/xslt_dm_request/80021000.json diff --git a/dataworker/input/80040013.json b/dataworker/input/xslt_dm_request/80040013.json similarity index 100% rename from dataworker/input/80040013.json rename to dataworker/input/xslt_dm_request/80040013.json diff --git a/dataworker/input/94300928.json b/dataworker/input/xslt_dm_request/94300928.json similarity index 100% rename from dataworker/input/94300928.json rename to dataworker/input/xslt_dm_request/94300928.json diff --git a/dataworker/src/main.rs b/dataworker/src/main.rs index e36a4da..6df1c25 100644 --- a/dataworker/src/main.rs +++ b/dataworker/src/main.rs @@ -19,7 +19,7 @@ async fn main() -> Result<(), Box> { .connect(&url) .await?; - let src = "input"; + let src = "input/xslt_dm_request"; let total = ReadDirStream::new(fs::read_dir(src).await?).count().await; let mut stream = ReadDirStream::new(fs::read_dir(src).await?); let mut count = 0;