diff --git a/VRN Data Database.sql b/VRN Data Database.sql index a199aab..16e9752 100644 --- a/VRN Data Database.sql +++ b/VRN Data Database.sql @@ -62,7 +62,6 @@ CREATE TABLE "serving_lines" ( CREATE TABLE "arrivals" ( "depatureRequestId" serial, - "stopDetailId" serial PRIMARY KEY, "stopID" int, "x" float, "y" float, @@ -99,18 +98,20 @@ CREATE TABLE "arrivals" ( "operator_name" varchar(255), "operator_publicCode" varchar(50), "attrs_name" varchar(255), - "attrs_value" varchar(255) + "attrs_value" varchar(255), + primary key("stopID", "servingLine_key", "servingLine_stateless") ); CREATE TABLE "depature_requests" ( "depatureRequestId" serial PRIMARY KEY, - "stopid" int, + "stopid" int unique, "serverid" varchar(255), "requestid" varchar(255), "sessionid" varchar(255), "calcTime" float, "serverTime" timestamp, - "logRequestId" varchar(255) + "logRequestId" varchar(255), + unique ("depatureRequestId", "stopid") ); CREATE TABLE "station_hints" ( @@ -131,7 +132,8 @@ CREATE TABLE "serving_line_depature_requests" ( ALTER TABLE "arrivals" ADD FOREIGN KEY ("depatureRequestId") REFERENCES "depature_requests" ("depatureRequestId"); -ALTER TABLE "depature_requests" ADD FOREIGN KEY ("stopid") REFERENCES "stop_lists" ("input"); +--ALTER TABLE "depature_requests" ADD FOREIGN KEY ("stopid") REFERENCES "stop_lists" ("input"); +ALTER TABLE "stop_lists" ADD FOREIGN KEY ("input") REFERENCES "depature_requests" ("stopid"); ALTER TABLE "depature_requeststation_hints" ADD FOREIGN KEY ("infoLinkUrl") REFERENCES "station_hints" ("infoLinkURL"); diff --git a/src/main.rs b/src/main.rs index 83fa081..4b609fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use models::{ use serde_json; use sha2::Digest; use sha2::Sha256; -use std::{env, error::Error, fs, str::FromStr}; +use std::{env, error::Error, fs, str::FromStr, io::Write}; use crate::diesel::OptionalExtension; use crate::schema::arrivals::dsl::arrivals; @@ -35,61 +35,19 @@ async fn main() -> Result<(), Box> { let mut conn = PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)); - let src = "/home/bread/Downloads/2025-01-21"; - //let pool = SqlitePool::connect("sqlite::memory:").await?; + let src = "/home/bread/Downloads/2025-01-22"; + let total = fs::read_dir(src).unwrap().count(); + let mut count = 0; fs::read_dir(src).unwrap().into_iter().for_each(|path| { + count += 1; let path = path.unwrap(); + print!("\x1B[2K\r {}/{} ({})", count, total, path.file_name().to_str().unwrap()); + std::io::stdout().flush().unwrap(); let content = fs::read_to_string(path.path()).unwrap(); let data = serde_json::from_str::(&content).unwrap(); let arrival_list = data.arrival_list.unwrap(); - let _ = insert_into(stop_lists) - .values(StopList { - input: i32::from_str(&data.dm.input.input).unwrap(), - point_dm: Some(data.dm.points.point.usage), - point_type: Some(data.dm.points.point.type_field.clone()), - point_name: Some(data.dm.points.point.name), - point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(), - point_stateless: Some(data.dm.points.point.stateless), - point_anytype: Some(data.dm.points.point.any_type), - point_sort: i32::from_str(&data.dm.points.point.sort).ok(), - point_quality: i32::from_str(&data.dm.points.point.quality).ok(), - point_best: i32::from_str(&data.dm.points.point.best).ok(), - point_object: Some(data.dm.points.point.object), - point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(), - point_ref_gid: Some(data.dm.points.point.ref_field.gid), - point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(), - point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(), - point_ref_place: Some(data.dm.points.point.ref_field.place), - point_ref_coords: Some(data.dm.points.point.ref_field.coords), - itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id) - .ok(), - itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name), - itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(), - itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(), - itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name), - itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value), - itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place), - itdOdvAssignedStops_nameWithPlace: Some( - data.dm.itd_odv_assigned_stops.name_with_place, - ), - itdOdvAssignedStops_distanceTime: i32::from_str( - &data.dm.itd_odv_assigned_stops.distance_time, - ) - .ok(), - itdOdvAssignedStops_isTransferStop: i32::from_str( - &data.dm.itd_odv_assigned_stops.is_transfer_stop, - ) - .ok(), - itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(), - itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid), - meta_lastModificationDate: Some(Local::now().naive_local()), - }) - .returning(StopList::as_returning()) - .on_conflict_do_nothing() - .get_result(&mut conn) - .optional(); - let req = insert_into(depature_requests) + if let Some(req) = insert_into(depature_requests) .values(NewDepatureRequest { stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(), serverid: data @@ -142,8 +100,53 @@ async fn main() -> Result<(), Box> { .on_conflict_do_nothing() .get_result(&mut conn) .optional() - .unwrap() - .unwrap(); + .unwrap() { + let _ = insert_into(stop_lists) + .values(StopList { + input: i32::from_str(&data.dm.input.input).unwrap(), + point_dm: Some(data.dm.points.point.usage), + point_type: Some(data.dm.points.point.type_field.clone()), + point_name: Some(data.dm.points.point.name), + point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(), + point_stateless: Some(data.dm.points.point.stateless), + point_anytype: Some(data.dm.points.point.any_type), + point_sort: i32::from_str(&data.dm.points.point.sort).ok(), + point_quality: i32::from_str(&data.dm.points.point.quality).ok(), + point_best: i32::from_str(&data.dm.points.point.best).ok(), + point_object: Some(data.dm.points.point.object), + point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(), + point_ref_gid: Some(data.dm.points.point.ref_field.gid), + point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(), + point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(), + point_ref_place: Some(data.dm.points.point.ref_field.place), + point_ref_coords: Some(data.dm.points.point.ref_field.coords), + itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id) + .ok(), + itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name), + itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(), + itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(), + itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name), + itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value), + itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place), + itdOdvAssignedStops_nameWithPlace: Some( + data.dm.itd_odv_assigned_stops.name_with_place, + ), + itdOdvAssignedStops_distanceTime: i32::from_str( + &data.dm.itd_odv_assigned_stops.distance_time, + ) + .ok(), + itdOdvAssignedStops_isTransferStop: i32::from_str( + &data.dm.itd_odv_assigned_stops.is_transfer_stop, + ) + .ok(), + itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(), + itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid), + meta_lastModificationDate: Some(Local::now().naive_local()), + }) + .returning(StopList::as_returning()) + .on_conflict_do_nothing() + .get_result(&mut conn) + .optional(); for line in data.serving_lines.lines { let id = hex::encode(Sha256::digest( serde_json::to_string(&line.mode).unwrap().into_bytes(), @@ -202,9 +205,8 @@ async fn main() -> Result<(), Box> { } { let _ = insert_into(arrivals) - .values(arrival_list.into_iter().map(|arrival| { Arrival { + .values(&arrival_list.into_iter().map(|arrival| { Arrival { depatureRequestId: req.depatureRequestId, - stopDetailId: 0, stopID: i32::from_str(&arrival.stop_id).ok(), x: f64::from_str(&arrival.x).ok(), y: f64::from_str(&arrival.y).ok(), @@ -281,10 +283,12 @@ async fn main() -> Result<(), Box> { }}).collect::>()) .returning(Arrival::as_returning()) .on_conflict_do_nothing() - .get_result(&mut conn) + .get_results(&mut conn) .optional(); } + } }); + println!(""); Ok(()) } diff --git a/src/models.rs b/src/models.rs index d1f053e..60b9159 100644 --- a/src/models.rs +++ b/src/models.rs @@ -2,6 +2,7 @@ #![allow(unused)] #![allow(clippy::all)] +#![allow(non_snake_case)] use crate::schema::*; use chrono::NaiveDate; @@ -10,10 +11,9 @@ use diesel::pg::Pg; #[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)] #[diesel(table_name = arrivals)] #[diesel(check_for_backend(Pg))] -#[diesel(primary_key(stopDetailId))] +#[diesel(primary_key(stopID, servingLine_key, servingLine_stateless))] pub struct Arrival { pub depatureRequestId: i32, - pub stopDetailId: i32, pub stopID: Option, pub x: Option, pub y: Option, diff --git a/src/schema.rs b/src/schema.rs index 1eda837..c418af0 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,9 +1,8 @@ // @generated automatically by Diesel CLI. diesel::table! { - arrivals (stopDetailId) { + arrivals (stopID, servingLine_key, servingLine_stateless) { depatureRequestId -> Int4, - stopDetailId -> Int4, stopID -> Nullable, x -> Nullable, y -> Nullable,