async hell

This commit is contained in:
theBreadCompany 2025-02-03 19:36:29 +01:00
parent e7cfc8217a
commit 16f369d73a
12 changed files with 54 additions and 41 deletions

View file

@ -6,7 +6,8 @@ use sqlx::postgres::PgPoolOptions;
use dotenvy::dotenv; use dotenvy::dotenv;
use std::env; use std::env;
use std::io::Write; use std::io::Write;
use tokio::time::{self, Duration}; use tokio::task;
use tokio::time::{self, sleep, Duration};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -27,52 +28,64 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut count = 0; let mut count = 0;
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut interval = time::interval(Duration::from_secs(1800)); //let mut interval = time::interval(Duration::from_secs(1800));
loop { loop {
print!("\nWaiting for next interval tick... "); let mut handles = vec![];
std::io::stdout().flush().unwrap(); std::io::stdout().flush().unwrap();
interval.tick().await; //interval.tick().await;
let date = Local::now(); let date = Local::now();
println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M")); //println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M"));
for station in &stations { for station in &stations {
count += 1; let station = station.clone();
let req = req let req = req.clone();
.replace("TEMPLATE_ID", &station) let client = client.clone();
.replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) let mut conn = conn.clone();
.replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); handles.push(task::spawn(async move {
print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); count += 1;
std::io::stdout().flush().unwrap(); let req = req
if let Ok(doc) = client .replace("TEMPLATE_ID", &station)
.get(req.replace("TEMPLATE_DEPARR", "arr")) .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string())
.header("Authorization", &token) .replace("TEMPLATE_TIME", &date.format("%H%M").to_string());
.send() print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station);
.await? std::io::stdout().flush().unwrap();
.json::<Root>() if let Ok(doc) = client
.await .get(req.replace("TEMPLATE_DEPARR", "arr"))
{ //.header("Authorization", &token)
match insert_document(doc, &mut conn).await { .send()
Ok(_) => {} .await
Err(e) => println!("\x1B[2K\rError inserting document: {}", e), .unwrap()
.json::<Root>()
.await
{
match insert_document(doc, &mut conn).await {
Ok(_) => {}
Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
}
} else {
println!("\x1B[2K\rFailed to fetch arrivals for station {}", station);
} }
} else { if let Ok(doc) = client
println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); .get(req.replace("TEMPLATE_DEPARR", "dep"))
} //.header("Authorization", &token)
if let Ok(doc) = client .send()
.get(req.replace("TEMPLATE_DEPARR", "dep")) .await
.header("Authorization", &token) .unwrap()
.send() .json::<Root>()
.await? .await
.json::<Root>() {
.await match insert_document(doc, &mut conn).await {
{ Ok(_) => {}
match insert_document(doc, &mut conn).await { Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
Ok(_) => {} }
Err(e) => println!("\x1B[2K\rError inserting document: {}", e), } else {
println!("\x1B[2K\rFailed to fetch departures for station {}", station);
} }
} else { }));
println!("\x1B[2K\rFailed to fetch departures for station {}", station);
} }
for handle in handles {
handle.await?;
sleep(Duration::from_millis(100)).await;
} }
count = 0; count = 0;
} }

View file

@ -19,7 +19,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.connect(&url) .connect(&url)
.await?; .await?;
let src = "input"; let src = "input/xslt_dm_request";
let total = ReadDirStream::new(fs::read_dir(src).await?).count().await; let total = ReadDirStream::new(fs::read_dir(src).await?).count().await;
let mut stream = ReadDirStream::new(fs::read_dir(src).await?); let mut stream = ReadDirStream::new(fs::read_dir(src).await?);
let mut count = 0; let mut count = 0;