This commit is contained in:
theBreadCompany 2025-02-06 04:19:18 +01:00
parent 16f369d73a
commit ac43b0ccf5
6 changed files with 330 additions and 101 deletions

228
Cargo.lock generated
View file

@ -38,6 +38,23 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anyhow"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04"
[[package]]
name = "async-trait"
version = "0.1.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644dd749086bf3771a2fbc5f256fdb982d53f011c7d5d560304eafeecebce79d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atoi" name = "atoi"
version = "2.0.0" version = "2.0.0"
@ -86,6 +103,12 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.8.0" version = "2.8.0"
@ -239,6 +262,8 @@ dependencies = [
"dataworker", "dataworker",
"dotenvy", "dotenvy",
"reqwest", "reqwest",
"reqwest-middleware",
"reqwest-retry",
"sqlx", "sqlx",
"tokio", "tokio",
] ]
@ -253,7 +278,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sqlx", "sqlx",
"thiserror", "thiserror 2.0.11",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"uuid", "uuid",
@ -408,6 +433,21 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.31" version = "0.3.31"
@ -443,7 +483,7 @@ checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"lock_api", "lock_api",
"parking_lot", "parking_lot 0.12.3",
] ]
[[package]] [[package]]
@ -481,6 +521,7 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-macro", "futures-macro",
@ -509,8 +550,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"js-sys",
"libc", "libc",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
] ]
[[package]] [[package]]
@ -893,6 +936,18 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "instant"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.11.0" version = "2.11.0"
@ -1101,7 +1156,7 @@ version = "0.10.70"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
"cfg-if", "cfg-if",
"foreign-types", "foreign-types",
"libc", "libc",
@ -1145,6 +1200,17 @@ version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.3" version = "0.12.3"
@ -1152,7 +1218,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [ dependencies = [
"lock_api", "lock_api",
"parking_lot_core", "parking_lot_core 0.9.10",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
] ]
[[package]] [[package]]
@ -1163,7 +1243,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"redox_syscall", "redox_syscall 0.5.8",
"smallvec", "smallvec",
"windows-targets 0.52.6", "windows-targets 0.52.6",
] ]
@ -1279,13 +1359,22 @@ dependencies = [
"getrandom 0.2.15", "getrandom 0.2.15",
] ]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.8" version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
] ]
[[package]] [[package]]
@ -1332,6 +1421,52 @@ dependencies = [
"windows-registry", "windows-registry",
] ]
[[package]]
name = "reqwest-middleware"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1ccd3b55e711f91a9885a2fa6fbbb2e39db1776420b062efc058c6410f7e5e3"
dependencies = [
"anyhow",
"async-trait",
"http",
"reqwest",
"serde",
"thiserror 1.0.69",
"tower-service",
]
[[package]]
name = "reqwest-retry"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29c73e4195a6bfbcb174b790d9b3407ab90646976c55de58a6515da25d851178"
dependencies = [
"anyhow",
"async-trait",
"futures",
"getrandom 0.2.15",
"http",
"hyper",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
"retry-policies",
"thiserror 1.0.69",
"tokio",
"tracing",
"wasm-timer",
]
[[package]]
name = "retry-policies"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5875471e6cab2871bc150ecb8c727db5113c9338cc3354dc5ee3425b6aa40a1c"
dependencies = [
"rand",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.8" version = "0.17.8"
@ -1379,7 +1514,7 @@ version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
@ -1458,7 +1593,7 @@ version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
"core-foundation", "core-foundation",
"core-foundation-sys", "core-foundation-sys",
"libc", "libc",
@ -1659,7 +1794,7 @@ dependencies = [
"serde_json", "serde_json",
"sha2", "sha2",
"smallvec", "smallvec",
"thiserror", "thiserror 2.0.11",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
@ -1714,7 +1849,7 @@ checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64",
"bitflags", "bitflags 2.8.0",
"byteorder", "byteorder",
"bytes", "bytes",
"chrono", "chrono",
@ -1744,7 +1879,7 @@ dependencies = [
"smallvec", "smallvec",
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror 2.0.11",
"tracing", "tracing",
"uuid", "uuid",
"whoami", "whoami",
@ -1758,7 +1893,7 @@ checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613"
dependencies = [ dependencies = [
"atoi", "atoi",
"base64", "base64",
"bitflags", "bitflags 2.8.0",
"byteorder", "byteorder",
"chrono", "chrono",
"crc", "crc",
@ -1783,7 +1918,7 @@ dependencies = [
"smallvec", "smallvec",
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror 2.0.11",
"tracing", "tracing",
"uuid", "uuid",
"whoami", "whoami",
@ -1874,7 +2009,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
"core-foundation", "core-foundation",
"system-configuration-sys", "system-configuration-sys",
] ]
@ -1903,13 +2038,33 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "2.0.11" version = "2.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl 2.0.11",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
] ]
[[package]] [[package]]
@ -1958,7 +2113,7 @@ dependencies = [
"bytes", "bytes",
"libc", "libc",
"mio", "mio",
"parking_lot", "parking_lot 0.12.3",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
@ -2270,6 +2425,21 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "wasm-timer"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f"
dependencies = [
"futures",
"js-sys",
"parking_lot 0.11.2",
"pin-utils",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]] [[package]]
name = "web-sys" name = "web-sys"
version = "0.3.77" version = "0.3.77"
@ -2286,10 +2456,32 @@ version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d" checksum = "372d5b87f58ec45c384ba03563b03544dc5fadc3983e434b286913f5b4a9bb6d"
dependencies = [ dependencies = [
"redox_syscall", "redox_syscall 0.5.8",
"wasite", "wasite",
] ]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-core" name = "windows-core"
version = "0.52.0" version = "0.52.0"
@ -2483,7 +2675,7 @@ version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [ dependencies = [
"bitflags", "bitflags 2.8.0",
] ]
[[package]] [[package]]

View file

@ -10,3 +10,5 @@ tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] }
dataworker = { path = "../dataworker" } dataworker = { path = "../dataworker" }
dotenvy = "0.15.7" dotenvy = "0.15.7"
sqlx = { version = "0.8.3", features = ["postgres"] } sqlx = { version = "0.8.3", features = ["postgres"] }
reqwest-middleware = "0.4.0"
reqwest-retry = "0.7.0"

View file

@ -4,10 +4,12 @@ use dataworker::insert_document;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use dotenvy::dotenv; use dotenvy::dotenv;
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::env; use std::env;
use std::io::Write; use std::io::Write;
use tokio::task; use tokio::task;
use tokio::time::{self, sleep, Duration}; use tokio::time::{sleep, Duration};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -16,19 +18,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.split(',') .split(',')
.map(|s| s.to_string()) .map(|s| s.to_string())
.collect(); .collect();
let token = env::var("VRN_TOKEN")?; let _token = env::var("VRN_TOKEN")?;
let req = env::var("REQ")?; let req = env::var("REQ")?;
let url = env::var("DATABASE_URL")?; let url = env::var("DATABASE_URL")?;
let mut conn = PgPoolOptions::new() let conn = PgPoolOptions::new()
.max_connections(5) .max_connections(8)
.acquire_timeout(Duration::from_secs(3600))
.connect(&url) .connect(&url)
.await?; .await?;
let total = stations.len(); let total = stations.len();
let mut count = 0; let mut count = 0;
let client = reqwest::Client::new(); let client = ClientBuilder::new(reqwest::Client::new())
//let mut interval = time::interval(Duration::from_secs(1800)); .with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder().build_with_max_retries(3),
))
.build();
loop { loop {
let mut handles = vec![]; let mut handles = vec![];
@ -36,56 +42,64 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
//interval.tick().await; //interval.tick().await;
let date = Local::now(); let date = Local::now();
//println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M")); //println!("Ticked on {}", &date.format("%Y.%m.%d %H:%M"));
for station in &stations { for station_batch in &mut stations.chunks(50000) {
for station in station_batch {
let station = station.clone(); let station = station.clone();
let req = req.clone(); let req = req.clone();
let client = client.clone(); let client = client.clone();
let mut conn = conn.clone(); let mut conn = conn.clone();
count += 1;
handles.push(task::spawn(async move { handles.push(task::spawn(async move {
count += 1; let req = req
let req = req .replace("TEMPLATE_ID", &station)
.replace("TEMPLATE_ID", &station) .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string())
.replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string()) .replace("TEMPLATE_TIME", &date.format("%H%M").to_string());
.replace("TEMPLATE_TIME", &date.format("%H%M").to_string()); print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station);
print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station); std::io::stdout().flush().unwrap();
std::io::stdout().flush().unwrap(); if let Ok(doc) = client
if let Ok(doc) = client .get(req.replace("TEMPLATE_DEPARR", "arr"))
.get(req.replace("TEMPLATE_DEPARR", "arr")) //.header("Authorization", &token)
//.header("Authorization", &token) .send()
.send() .await
.await .unwrap()
.unwrap() .json::<Root>()
.json::<Root>() .await
.await {
{ match insert_document(doc, &mut conn).await {
match insert_document(doc, &mut conn).await { Ok(_) => {}
Ok(_) => {} Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
Err(e) => println!("\x1B[2K\rError inserting document: {}", e), }
} else {
println!("\x1B[2K\rFailed to fetch arrivals for station {}", station);
} }
} else { /*
println!("\x1B[2K\rFailed to fetch arrivals for station {}", station); if let Ok(doc) = client
} .get(req.replace("TEMPLATE_DEPARR", "dep"))
if let Ok(doc) = client //.header("Authorization", &token)
.get(req.replace("TEMPLATE_DEPARR", "dep")) .send()
//.header("Authorization", &token) .await
.send() .unwrap()
.await .json::<Root>()
.unwrap() .await
.json::<Root>() {
.await match insert_document(doc, &mut conn).await {
{ Ok(_) => {}
match insert_document(doc, &mut conn).await { Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
Ok(_) => {} }
Err(e) => println!("\x1B[2K\rError inserting document: {}", e), } else {
println!("\x1B[2K\rFailed to fetch departures for station {}", station);
} }
} else {
println!("\x1B[2K\rFailed to fetch departures for station {}", station); */
} //sleep(Duration::from_millis(100)).await;
})); }));
sleep(Duration::from_millis(10)).await;
} }
for handle in handles { for handle in handles {
handle.await?; handle.await?;
sleep(Duration::from_millis(100)).await; }
handles = vec![];
} }
count = 0; count = 0;
} }

View file

@ -516,34 +516,22 @@ async fn insert_arrivals(
.fetch_all(conn) .fetch_all(conn)
.await?; .await?;
let operators = sqlx::query_scalar!(
"WITH result AS(
INSERT INTO operators (code, name, pub_code)
SELECT * FROM UNNEST($1::varchar[], $2::varchar[], $3::varchar[])
ON CONFLICT (code) DO NOTHING
RETURNING id
)
SELECT id FROM result
UNION
SELECT id FROM operators WHERE code = ANY($1);",
&arrivals
.iter()
.map(|a| a.operator_code.clone())
.collect::<Vec<_>>() as &[Option<_>],
&arrivals
.iter()
.map(|a| a.operator_name.clone())
.collect::<Vec<_>>() as &[Option<_>],
&arrivals
.iter()
.map(|a| a.operator_public_code.clone())
.collect::<Vec<_>>() as &[Option<_>]
)
.fetch_all(conn)
.await?;
sqlx::query!( sqlx::query!(
"INSERT INTO movements ( "
WITH
operators_insert AS(
INSERT INTO operators (code, name, pub_code)
SELECT code, name, pub_code FROM UNNEST($27::varchar[], $28::varchar[], $29::varchar[]) AS t(code, name, pub_code)
WHERE code IS NOT NULL
ON CONFLICT (code) DO NOTHING
RETURNING id, code
),
operators AS(
SELECT id, code FROM operators_insert
UNION
SELECT id, code FROM operators WHERE code = ANY($28)
)
INSERT INTO movements (
departure_request, stopID, x, y, mapName, area, platform, platformName, departure_request, stopID, x, y, mapName, area, platform, platformName,
countdown, time, realtime, key, code, countdown, time, realtime, key, code,
number, symbol, motType, mtSubcode, is_realtime, direction, directionFrom, number, symbol, motType, mtSubcode, is_realtime, direction, directionFrom,
@ -555,7 +543,11 @@ async fn insert_arrivals(
$10::timestamp[], $11::timestamp[], $12::integer[], $13::varchar[], $14::varchar[], $10::timestamp[], $11::timestamp[], $12::integer[], $13::varchar[], $14::varchar[],
$15::varchar[], $16::integer[], $17::integer[], $18::boolean[], $15::varchar[], $16::integer[], $17::integer[], $18::boolean[],
$19::varchar[], $20::varchar[], $21::varchar[], $22::integer[], $19::varchar[], $20::varchar[], $21::varchar[], $22::integer[],
$23::varchar[], $24::varchar[], $25::varchar[], $26::integer[], $27::boolean[] $23::varchar[], $24::varchar[], $25::varchar[],
ARRAY(
SELECT COALESCE(o.id, NULL)
FROM UNNEST($28::varchar[]) c LEFT JOIN operators o ON o.code = c
)::integer[], $26::boolean[]
) ON CONFLICT ON CONSTRAINT movements_uniq DO NOTHING", ) ON CONFLICT ON CONSTRAINT movements_uniq DO NOTHING",
&arrivals &arrivals
.iter() .iter()
@ -615,8 +607,19 @@ async fn insert_arrivals(
.iter() .iter()
.map(|r| r.line_display.clone()) .map(|r| r.line_display.clone())
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
&operators as &[Option<_>],
&arrivals.iter().map(|r| r.r#type).collect::<Vec<_>>(), &arrivals.iter().map(|r| r.r#type).collect::<Vec<_>>(),
&arrivals
.iter()
.map(|a| a.operator_code.clone())
.collect::<Vec<_>>() as &[Option<_>],
&arrivals
.iter()
.map(|a| a.operator_name.clone())
.collect::<Vec<_>>() as &[Option<_>],
&arrivals
.iter()
.map(|a| a.operator_public_code.clone())
.collect::<Vec<_>>() as &[Option<_>]
) )
.execute(conn) .execute(conn)
.await .await

View file

@ -6,6 +6,7 @@ use sqlx::postgres::PgPoolOptions;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
use std::io::Write; use std::io::Write;
use std::time::Duration;
use tokio::fs; use tokio::fs;
use tokio::fs::DirEntry; use tokio::fs::DirEntry;
use tokio_stream::wrappers::ReadDirStream; use tokio_stream::wrappers::ReadDirStream;
@ -14,15 +15,17 @@ use tokio_stream::wrappers::ReadDirStream;
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
dotenv().ok(); dotenv().ok();
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = PgPoolOptions::new() let conn = PgPoolOptions::new()
.max_connections(5) .max_connections(30)
.acquire_timeout(Duration::from_secs(300))
.connect(&url) .connect(&url)
.await?; .await?;
let src = "input/xslt_dm_request"; let src = "/home/bread/Downloads/station/willdaten";
let total = ReadDirStream::new(fs::read_dir(src).await?).count().await; let total = ReadDirStream::new(fs::read_dir(src).await?).count().await;
let mut stream = ReadDirStream::new(fs::read_dir(src).await?); let mut stream = ReadDirStream::new(fs::read_dir(src).await?);
let mut count = 0; let mut count = 0;
let mut handles = vec![];
while let Some(path) = stream.next().await { while let Some(path) = stream.next().await {
count += 1; count += 1;
@ -34,12 +37,27 @@ async fn main() -> Result<(), Box<dyn Error>> {
path.file_name().to_str().unwrap() path.file_name().to_str().unwrap()
); );
std::io::stdout().flush().unwrap(); std::io::stdout().flush().unwrap();
let content = fs::read_to_string(path.path()).await.unwrap(); let mut conn = conn.clone();
if content.len() > 0 {
if let Ok(data) = serde_json::from_str::<Root>(&content) { if handles.len() > 5000 {
insert_document(data, &mut conn).await?; while let Some(handle) = handles.pop() {
handle.await?;
} }
handles = vec![];
} }
handles.push(tokio::spawn(async move {
let content = fs::read_to_string(path.path()).await.unwrap();
if content.len() > 0 {
if let Ok(data) = serde_json::from_str::<Root>(&content) {
insert_document(data, &mut conn).await.ok();
}
}
}));
}
for handle in handles {
handle.await?;
} }
println!(); println!();

0
default.nix Normal file
View file