diff --git a/Cargo.lock b/Cargo.lock index f1d7e13..82dbc7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,23 @@ dependencies = [ "libc", ] +[[package]] +name = "anyhow" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" + +[[package]] +name = "async-trait" +version = "0.1.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "2.0.0" @@ -86,6 +103,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.8.0" @@ -239,6 +262,8 @@ dependencies = [ "dataworker", "dotenvy", "reqwest", + "reqwest-middleware", + "reqwest-retry", "sqlx", "tokio", ] @@ -253,7 +278,7 @@ dependencies = [ "serde", "serde_json", "sqlx", - "thiserror", + "thiserror 2.0.11", "tokio", "tokio-stream", "uuid", @@ -408,6 +433,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -443,7 +483,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" dependencies = [ "futures-core", "lock_api", - "parking_lot", + "parking_lot 0.12.3", ] [[package]] @@ -481,6 +521,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -509,8 +550,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -893,6 +936,18 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -1101,7 +1156,7 @@ version = "0.10.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" dependencies = [ - "bitflags", + "bitflags 2.8.0", "cfg-if", "foreign-types", "libc", @@ -1145,6 +1200,17 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -1152,7 +1218,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.10", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -1163,7 +1243,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.8", "smallvec", "windows-targets 0.52.6", ] @@ -1279,13 +1359,22 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags", + "bitflags 2.8.0", ] [[package]] @@ -1332,6 +1421,52 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "reqwest-middleware" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1ccd3b55e711f91a9885a2fa6fbbb2e39db1776420b062efc058c6410f7e5e3" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "serde", + "thiserror 1.0.69", + "tower-service", +] + +[[package]] +name = "reqwest-retry" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c73e4195a6bfbcb174b790d9b3407ab90646976c55de58a6515da25d851178" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "getrandom 0.2.15", + "http", + "hyper", + "parking_lot 0.11.2", + "reqwest", + "reqwest-middleware", + "retry-policies", + "thiserror 1.0.69", + "tokio", + "tracing", + "wasm-timer", +] + +[[package]] +name = "retry-policies" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5875471e6cab2871bc150ecb8c727db5113c9338cc3354dc5ee3425b6aa40a1c" +dependencies = [ + "rand", +] + [[package]] name = "ring" version = "0.17.8" @@ -1379,7 +1514,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.8.0", "errno", "libc", "linux-raw-sys", @@ -1458,7 +1593,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation", "core-foundation-sys", "libc", @@ -1659,7 +1794,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror", + "thiserror 2.0.11", "tokio", "tokio-stream", "tracing", @@ -1714,7 +1849,7 @@ checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" dependencies = [ "atoi", "base64", - "bitflags", + "bitflags 2.8.0", "byteorder", "bytes", "chrono", @@ -1744,7 +1879,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.11", "tracing", "uuid", "whoami", @@ -1758,7 +1893,7 @@ checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" dependencies = [ "atoi", "base64", - "bitflags", + "bitflags 2.8.0", "byteorder", "chrono", "crc", @@ -1783,7 +1918,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.11", "tracing", "uuid", "whoami", @@ -1874,7 +2009,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.8.0", "core-foundation", "system-configuration-sys", ] @@ -1903,13 +2038,33 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.11", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1958,7 +2113,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2270,6 +2425,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.77" @@ -2286,10 +2456,32 @@ version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" dependencies = [ - "redox_syscall", + "redox_syscall 0.5.8", "wasite", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" @@ -2483,7 +2675,7 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" dependencies = [ - "bitflags", + "bitflags 2.8.0", ] [[package]] diff --git a/datafetcher/Cargo.toml b/datafetcher/Cargo.toml index 1a70d3e..a315b63 100644 --- a/datafetcher/Cargo.toml +++ b/datafetcher/Cargo.toml @@ -10,3 +10,5 @@ tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] } dataworker = { path = "../dataworker" } dotenvy = "0.15.7" sqlx = { version = "0.8.3", features = ["postgres"] } +reqwest-middleware = "0.4.0" +reqwest-retry = "0.7.0" diff --git a/datafetcher/src/main.rs b/datafetcher/src/main.rs index 3e4cec2..747bcf6 100644 --- a/datafetcher/src/main.rs +++ b/datafetcher/src/main.rs @@ -4,10 +4,12 @@ use dataworker::insert_document; use sqlx::postgres::PgPoolOptions; use dotenvy::dotenv; +use reqwest_middleware::ClientBuilder; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use std::env; use std::io::Write; use tokio::task; -use tokio::time::{self, sleep, Duration}; +use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -16,19 +18,23 @@ async fn main() -> Result<(), Box> { .split(',') .map(|s| s.to_string()) .collect(); - let token = env::var("VRN_TOKEN")?; + let _token = env::var("VRN_TOKEN")?; let req = env::var("REQ")?; let url = env::var("DATABASE_URL")?; - let mut conn = PgPoolOptions::new() - .max_connections(5) + let conn = PgPoolOptions::new() + .max_connections(8) + .acquire_timeout(Duration::from_secs(3600)) .connect(&url) .await?; let total = stations.len(); let mut count = 0; - let client = reqwest::Client::new(); - //let mut interval = time::interval(Duration::from_secs(1800)); + let client = ClientBuilder::new(reqwest::Client::new()) + .with(RetryTransientMiddleware::new_with_policy( + ExponentialBackoff::builder().build_with_max_retries(3), + )) + .build(); loop { let mut handles = vec![]; @@ -36,56 +42,64 @@ async fn main() -> Result<(), Box> { //interval.tick().await; let date = Local::now(); //println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M")); - for station in &stations { + for station_batch in &mut stations.chunks(50000) { + for station in station_batch { let station = station.clone(); let req = req.clone(); let client = client.clone(); let mut conn = conn.clone(); + count += 1; handles.push(task::spawn(async move { - count += 1; - let req = req - .replace("TEMPLATE_ID", &station) - .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) - .replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); - print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); - std::io::stdout().flush().unwrap(); - if let Ok(doc) = client - .get(req.replace("TEMPLATE_DEPARR", "arr")) - //.header("Authorization", &token) - .send() - .await - .unwrap() - .json::() - .await - { - match insert_document(doc, &mut conn).await { - Ok(_) => {} - Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + let req = req + .replace("TEMPLATE_ID", &station) + .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) + .replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); + print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); + std::io::stdout().flush().unwrap(); + if let Ok(doc) = client + .get(req.replace("TEMPLATE_DEPARR", "arr")) + //.header("Authorization", &token) + .send() + .await + .unwrap() + .json::() + .await + { + match insert_document(doc, &mut conn).await { + Ok(_) => {} + Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + } + } else { + println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); } - } else { - println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); - } - if let Ok(doc) = client - .get(req.replace("TEMPLATE_DEPARR", "dep")) - //.header("Authorization", &token) - .send() - .await - .unwrap() - .json::() - .await - { - match insert_document(doc, &mut conn).await { - Ok(_) => {} - Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + /* + if let Ok(doc) = client + .get(req.replace("TEMPLATE_DEPARR", "dep")) + //.header("Authorization", &token) + .send() + .await + .unwrap() + .json::() + .await + { + match insert_document(doc, &mut conn).await { + Ok(_) => {} + Err(e) => println!("\x1B[2K\rError inserting document: {}", e), + } + } else { + println!("\x1B[2K\rFailed to fetch departures for station {}", station); } - } else { - println!("\x1B[2K\rFailed to fetch departures for station {}", station); - } + + */ + //sleep(Duration::from_millis(100)).await; })); + sleep(Duration::from_millis(10)).await; } - for handle in handles { - handle.await?; - sleep(Duration::from_millis(100)).await; + for handle in handles { + handle.await?; + } + handles = vec![]; + } count = 0; } diff --git a/dataworker/src/lib.rs b/dataworker/src/lib.rs index cb3aaba..655cee4 100644 --- a/dataworker/src/lib.rs +++ b/dataworker/src/lib.rs @@ -516,34 +516,22 @@ async fn insert_arrivals( .fetch_all(conn) .await?; - let operators = sqlx::query_scalar!( - "WITH result AS( - INSERT INTO operators (code, name, pub_code) - SELECT * FROM UNNEST($1::varchar[], $2::varchar[], $3::varchar[]) - ON CONFLICT (code) DO NOTHING - RETURNING id - ) - SELECT id FROM result - UNION - SELECT id FROM operators WHERE code = ANY($1);", - &arrivals - .iter() - .map(|a| a.operator_code.clone()) - .collect::>() as &[Option<_>], - &arrivals - .iter() - .map(|a| a.operator_name.clone()) - .collect::>() as &[Option<_>], - &arrivals - .iter() - .map(|a| a.operator_public_code.clone()) - .collect::>() as &[Option<_>] - ) - .fetch_all(conn) - .await?; - sqlx::query!( - "INSERT INTO movements ( + " + WITH + operators_insert AS( + INSERT INTO operators (code, name, pub_code) + SELECT code, name, pub_code FROM UNNEST($27::varchar[], $28::varchar[], $29::varchar[]) AS t(code, name, pub_code) + WHERE code IS NOT NULL + ON CONFLICT (code) DO NOTHING + RETURNING id, code + ), + operators AS( + SELECT id, code FROM operators_insert + UNION + SELECT id, code FROM operators WHERE code = ANY($28) + ) + INSERT INTO movements ( departure_request, stopID, x, y, mapName, area, platform, platformName, countdown, time, realtime, key, code, number, symbol, motType, mtSubcode, is_realtime, direction, directionFrom, @@ -555,7 +543,11 @@ async fn insert_arrivals( $10::timestamp[], $11::timestamp[], $12::integer[], $13::varchar[], $14::varchar[], $15::varchar[], $16::integer[], $17::integer[], $18::boolean[], $19::varchar[], $20::varchar[], $21::varchar[], $22::integer[], - $23::varchar[], $24::varchar[], $25::varchar[], $26::integer[], $27::boolean[] + $23::varchar[], $24::varchar[], $25::varchar[], + ARRAY( + SELECT COALESCE(o.id, NULL) + FROM UNNEST($28::varchar[]) c LEFT JOIN operators o ON o.code = c + )::integer[], $26::boolean[] ) ON CONFLICT ON CONSTRAINT movements_uniq DO NOTHING", &arrivals .iter() @@ -615,8 +607,19 @@ async fn insert_arrivals( .iter() .map(|r| r.line_display.clone()) .collect::>(), - &operators as &[Option<_>], &arrivals.iter().map(|r| r.r#type).collect::>(), + &arrivals + .iter() + .map(|a| a.operator_code.clone()) + .collect::>() as &[Option<_>], + &arrivals + .iter() + .map(|a| a.operator_name.clone()) + .collect::>() as &[Option<_>], + &arrivals + .iter() + .map(|a| a.operator_public_code.clone()) + .collect::>() as &[Option<_>] ) .execute(conn) .await diff --git a/dataworker/src/main.rs b/dataworker/src/main.rs index 6df1c25..6a8cbd4 100644 --- a/dataworker/src/main.rs +++ b/dataworker/src/main.rs @@ -6,6 +6,7 @@ use sqlx::postgres::PgPoolOptions; use std::env; use std::error::Error; use std::io::Write; +use std::time::Duration; use tokio::fs; use tokio::fs::DirEntry; use tokio_stream::wrappers::ReadDirStream; @@ -14,15 +15,17 @@ use tokio_stream::wrappers::ReadDirStream; async fn main() -> Result<(), Box> { dotenv().ok(); let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let mut conn = PgPoolOptions::new() - .max_connections(5) + let conn = PgPoolOptions::new() + .max_connections(30) + .acquire_timeout(Duration::from_secs(300)) .connect(&url) .await?; - let src = "input/xslt_dm_request"; + let src = "/home/bread/Downloads/station/willdaten"; let total = ReadDirStream::new(fs::read_dir(src).await?).count().await; let mut stream = ReadDirStream::new(fs::read_dir(src).await?); let mut count = 0; + let mut handles = vec![]; while let Some(path) = stream.next().await { count += 1; @@ -34,12 +37,27 @@ async fn main() -> Result<(), Box> { path.file_name().to_str().unwrap() ); std::io::stdout().flush().unwrap(); - let content = fs::read_to_string(path.path()).await.unwrap(); - if content.len() > 0 { - if let Ok(data) = serde_json::from_str::(&content) { - insert_document(data, &mut conn).await?; + let mut conn = conn.clone(); + + if handles.len() > 5000 { + while let Some(handle) = handles.pop() { + handle.await?; } + handles = vec![]; } + + handles.push(tokio::spawn(async move { + let content = fs::read_to_string(path.path()).await.unwrap(); + if content.len() > 0 { + if let Ok(data) = serde_json::from_str::(&content) { + insert_document(data, &mut conn).await.ok(); + } + } + })); + } + + for handle in handles { + handle.await?; } println!(); diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..e69de29