diff --git a/Cargo.lock b/dataworker/Cargo.lock similarity index 100% rename from Cargo.lock rename to dataworker/Cargo.lock diff --git a/Cargo.toml b/dataworker/Cargo.toml similarity index 85% rename from Cargo.toml rename to dataworker/Cargo.toml index 86cf2e5..6b2550f 100644 --- a/Cargo.toml +++ b/dataworker/Cargo.toml @@ -1,11 +1,11 @@ [package] -name = "willdaten" +name = "dataworker" version = "0.1.0" edition = "2021" [dependencies] chrono = "0.4.39" -diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] } +diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend"] } dotenvy = "0.15.7" hex = "0.4.3" maud = "0.26.0" diff --git a/VRN Data Database.sql b/dataworker/VRN Data Database.sql similarity index 100% rename from VRN Data Database.sql rename to dataworker/VRN Data Database.sql diff --git a/diesel.toml b/dataworker/diesel.toml similarity index 100% rename from diesel.toml rename to dataworker/diesel.toml diff --git a/src/dtos.rs b/dataworker/src/dtos.rs similarity index 100% rename from src/dtos.rs rename to dataworker/src/dtos.rs diff --git a/dataworker/src/lib.rs b/dataworker/src/lib.rs new file mode 100644 index 0000000..0497db4 --- /dev/null +++ b/dataworker/src/lib.rs @@ -0,0 +1,284 @@ +#[macro_use] +extern crate diesel; + +mod dtos; +mod models; +mod schema; + +use chrono::NaiveDate; +use chrono::{Local, NaiveDateTime}; +use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper}; +use dtos::Root; +use hex; +use models::{ + Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest, + StationHint, StopList, +}; +use serde_json; +use sha2::Digest; +use sha2::Sha256; +use std::{env, error::Error, fs, io::Write, str::FromStr}; + +use crate::diesel::OptionalExtension; +use crate::schema::arrivals::dsl::arrivals; +use crate::schema::depature_requests::dsl::depature_requests; +use crate::schema::serving_line_depature_requests::dsl::serving_line_depature_requests; +use crate::schema::serving_lines::dsl::serving_lines; +use crate::schema::station_hints::dsl::station_hints; +use crate::schema::stop_lists::dsl::stop_lists; + +pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box> { + let arrival_list = data.arrival_list.unwrap(); + if let Some(req) = insert_into(depature_requests) + .values(NewDepatureRequest { + stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(), + serverid: data + .parameters + .iter() + .filter(|p| p.name == "serverID") + .map(|p| p.clone().value) + .next(), + requestid: data + .parameters + .iter() + .filter(|p| p.name == "requestID") + .map(|p| p.clone().value) + .next(), + sessionid: data + .parameters + .iter() + .filter(|p| p.name == "sessionID") + .map(|p| p.clone().value) + .next(), + calcTime: Some( + f64::from_str( + &data + .parameters + .iter() + .filter(|p| p.name == "calcTime") + .map(|p| p.clone().value) + .next() + .unwrap(), + ) + .unwrap(), + ), + serverTime: data + .parameters + .iter() + .filter(|p| p.name == "serverTime") + .map(|p| p.clone().value) + .next() + .unwrap() + .parse::() + .ok(), + logRequestId: data + .parameters + .iter() + .filter(|p| p.name == "logRequestId") + .map(|p| p.clone().value) + .next(), + }) + .returning(DepatureRequest::as_returning()) + .on_conflict_do_nothing() + .get_result(conn) + .optional() + .unwrap() + { + let _ = insert_into(stop_lists) + .values(StopList { + input: i32::from_str(&data.dm.input.input).unwrap(), + point_dm: Some(data.dm.points.point.usage), + point_type: Some(data.dm.points.point.type_field.clone()), + point_name: Some(data.dm.points.point.name), + point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(), + point_stateless: Some(data.dm.points.point.stateless), + point_anytype: Some(data.dm.points.point.any_type), + point_sort: i32::from_str(&data.dm.points.point.sort).ok(), + point_quality: i32::from_str(&data.dm.points.point.quality).ok(), + point_best: i32::from_str(&data.dm.points.point.best).ok(), + point_object: Some(data.dm.points.point.object), + point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(), + point_ref_gid: Some(data.dm.points.point.ref_field.gid), + point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(), + point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(), + point_ref_place: Some(data.dm.points.point.ref_field.place), + point_ref_coords: Some(data.dm.points.point.ref_field.coords), + itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id) + .ok(), + itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name), + itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(), + itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(), + itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name), + itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value), + itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place), + itdOdvAssignedStops_nameWithPlace: Some( + data.dm.itd_odv_assigned_stops.name_with_place, + ), + itdOdvAssignedStops_distanceTime: i32::from_str( + &data.dm.itd_odv_assigned_stops.distance_time, + ) + .ok(), + itdOdvAssignedStops_isTransferStop: i32::from_str( + &data.dm.itd_odv_assigned_stops.is_transfer_stop, + ) + .ok(), + itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(), + itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid), + meta_lastModificationDate: Some(Local::now().naive_local()), + }) + .returning(StopList::as_returning()) + .on_conflict_do_nothing() + .get_result(conn) + .optional(); + for line in data.serving_lines.lines { + let id = hex::encode(Sha256::digest( + serde_json::to_string(&line.mode).unwrap().into_bytes(), + )); + let _ = insert_into(serving_lines) + .values(ServingLine { + servingLineId: id.clone(), + mode_name: Some(line.mode.name), + mode_number: Some(line.mode.number), + mode_product: Some(line.mode.product), + mode_productId: i32::from_str(&line.mode.product_id).ok(), + mode_type: i32::from_str(&line.mode.type_field).ok(), + mode_code: i32::from_str(&line.mode.code).ok(), + mode_destination: Some(line.mode.destination), + mode_destID: i32::from_str(&line.mode.dest_id).ok(), + mode_desc: Some(line.mode.desc), + mode_timetablePeriod: Some(line.mode.timetable_period), + diva_branch: Some(line.mode.diva.branch), + diva_line: Some(line.mode.diva.line), + diva_supplement: Some(line.mode.diva.supplement), + diva_dir: Some(line.mode.diva.dir), + diva_project: Some(line.mode.diva.project), + diva_network: Some(line.mode.diva.network), + diva_stateless: Some(line.mode.diva.stateless), + diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(), + diva_operator: Some(line.mode.diva.operator), + diva_opPublicCode: line.mode.diva.op_public_code, + diva_opCode: Some(line.mode.diva.op_code), + diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(), + diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(), + diva_lineDisplay: Some(line.mode.diva.line_display), + meta_lastModificationDate: req.serverTime, + }) + .returning(ServingLine::as_returning()) + .on_conflict_do_nothing() + .get_result(conn) + .optional() + .unwrap(); + let _ = insert_into(serving_line_depature_requests) + .values(ServingLineDepatureRequest { + depatureRequestId: req.depatureRequestId, + servingLineId: id.clone(), + }) + .returning(ServingLineDepatureRequest::as_returning()) + .on_conflict_do_nothing() + .get_result(conn) + .optional() + .unwrap(); + } + if let Some(infos) = data.dm.points.point.infos { + let _ = insert_into(station_hints).values(StationHint { + infoLinkURL: infos.info.info_link_url, + infoLinkText: Some(infos.info.info_link_text), + infoText: Some(infos.info.info_text.content), + }); + } + { + let _ = insert_into(arrivals) + .values( + &arrival_list + .into_iter() + .map(|arrival| Arrival { + depatureRequestId: req.depatureRequestId, + stopID: i32::from_str(&arrival.stop_id).ok(), + x: f64::from_str(&arrival.x).ok(), + y: f64::from_str(&arrival.y).ok(), + mapName: Some(arrival.map_name), + area: Some(arrival.area), + platform: Some(arrival.platform), + platformName: Some(arrival.platform_name), + stopName: Some(arrival.stop_name), + nameWO: Some(arrival.name_wo), + pointType: Some(data.dm.points.point.type_field.clone()), + countdown: i32::from_str(&arrival.countdown).ok(), + arrivalTime: NaiveDate::from_ymd_opt( + i32::from_str(&arrival.date_time.year).unwrap(), + u32::from_str(&arrival.date_time.month).unwrap(), + u32::from_str(&arrival.date_time.day).unwrap(), + ) + .unwrap() + .and_hms_opt( + u32::from_str(&arrival.date_time.hour).unwrap(), + u32::from_str(&arrival.date_time.minute).unwrap(), + 0, + ), + realArrivalTime: if let Some(date_time) = arrival.real_date_time { + NaiveDate::from_ymd_opt( + i32::from_str(&date_time.year).unwrap(), + u32::from_str(&date_time.month).unwrap(), + u32::from_str(&date_time.day).unwrap(), + ) + .unwrap() + .and_hms_opt( + u32::from_str(&date_time.hour).unwrap(), + u32::from_str(&date_time.minute).unwrap(), + 0, + ) + } else { + None + }, + servingLine_key: Some(arrival.serving_line.key), + servingLine_code: Some(arrival.serving_line.code), + servingLine_number: Some(arrival.serving_line.number), + servingLine_symbol: Some(arrival.serving_line.symbol), + servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(), + servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode) + .ok(), + servingLine_realtime: bool::from_str(&arrival.serving_line.realtime) + .ok(), + servingLine_direction: Some(arrival.serving_line.direction), + servingLine_directionFrom: Some(arrival.serving_line.direction_from), + servingLine_name: Some(arrival.serving_line.name), + servingLine_delay: if let Some(delay) = arrival.serving_line.delay { + i32::from_str(&delay).ok() + } else { + None + }, + servingLine_liErgRiProj_line: Some( + arrival.serving_line.li_erg_ri_proj.line, + ), + servingLine_liErgRiProj_project: Some( + arrival.serving_line.li_erg_ri_proj.project, + ), + servingLine_liErgRiProj_direction: Some( + arrival.serving_line.li_erg_ri_proj.direction, + ), + servingLine_liErgRiProj_supplement: Some( + arrival.serving_line.li_erg_ri_proj.supplement, + ), + servingLine_liErgRiProj_network: Some( + arrival.serving_line.li_erg_ri_proj.network, + ), + servingLine_destID: Some(arrival.serving_line.dest_id), + servingLine_stateless: Some(arrival.serving_line.stateless), + servingLine_lineDisplay: Some(arrival.serving_line.line_display), + operator_code: arrival.operator.clone().map(|op| op.code), + operator_name: arrival.operator.clone().map(|op| op.name), + operator_publicCode: arrival.operator.map(|op| op.public_code), + attrs_name: None, + attrs_value: None, + }) + .collect::>(), + ) + .returning(Arrival::as_returning()) + .on_conflict_do_nothing() + .get_results(conn) + .optional(); + } + } + + Ok(()) +} diff --git a/dataworker/src/main.rs b/dataworker/src/main.rs new file mode 100644 index 0000000..a165f8c --- /dev/null +++ b/dataworker/src/main.rs @@ -0,0 +1,45 @@ +#[macro_use] +extern crate diesel; + +use crate::dtos::Root; +use crate::lib::insert_document; +use diesel::{Connection, PgConnection}; +use dotenvy::dotenv; +use std::error::Error; +use std::io::Write; +use std::{env, fs}; + +mod dtos; +mod lib; +mod models; +mod schema; + +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv().ok(); + let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); + let mut conn = + PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)); + + let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22"; + let total = fs::read_dir(src).unwrap().count(); + let mut count = 0; + + fs::read_dir(src).unwrap().into_iter().for_each(|path| { + count += 1; + let path = path.unwrap(); + print!( + "\x1B[2K\r {}/{} ({})", + count, + total, + path.file_name().to_str().unwrap() + ); + std::io::stdout().flush().unwrap(); + let content = fs::read_to_string(path.path()).unwrap(); + let data = serde_json::from_str::(&content).unwrap(); + insert_document(data, &mut conn).unwrap(); + }); + + println!(); + Ok(()) +} diff --git a/src/models.rs b/dataworker/src/models.rs similarity index 100% rename from src/models.rs rename to dataworker/src/models.rs diff --git a/src/schema.rs b/dataworker/src/schema.rs similarity index 100% rename from src/schema.rs rename to dataworker/src/schema.rs diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 4b609fb..0000000 --- a/src/main.rs +++ /dev/null @@ -1,294 +0,0 @@ -#[macro_use] -extern crate diesel; - -mod dtos; -mod models; -mod schema; - -use chrono::NaiveDate; -use chrono::{Local, NaiveDateTime}; -use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper}; -use dotenvy::dotenv; -use dtos::Root; -use hex; -use models::{ - Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest, - StationHint, StopList, -}; -use serde_json; -use sha2::Digest; -use sha2::Sha256; -use std::{env, error::Error, fs, str::FromStr, io::Write}; - -use crate::diesel::OptionalExtension; -use crate::schema::arrivals::dsl::arrivals; -use crate::schema::depature_requests::dsl::depature_requests; -use crate::schema::serving_line_depature_requests::dsl::serving_line_depature_requests; -use crate::schema::serving_lines::dsl::serving_lines; -use crate::schema::station_hints::dsl::station_hints; -use crate::schema::stop_lists::dsl::stop_lists; - -#[tokio::main] -async fn main() -> Result<(), Box> { - dotenv().ok(); - let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let mut conn = - PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)); - - let src = "/home/bread/Downloads/2025-01-22"; - let total = fs::read_dir(src).unwrap().count(); - let mut count = 0; - - fs::read_dir(src).unwrap().into_iter().for_each(|path| { - count += 1; - let path = path.unwrap(); - print!("\x1B[2K\r {}/{} ({})", count, total, path.file_name().to_str().unwrap()); - std::io::stdout().flush().unwrap(); - let content = fs::read_to_string(path.path()).unwrap(); - let data = serde_json::from_str::(&content).unwrap(); - let arrival_list = data.arrival_list.unwrap(); - if let Some(req) = insert_into(depature_requests) - .values(NewDepatureRequest { - stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(), - serverid: data - .parameters - .iter() - .filter(|p| p.name == "serverID") - .map(|p| p.clone().value) - .next(), - requestid: data - .parameters - .iter() - .filter(|p| p.name == "requestID") - .map(|p| p.clone().value) - .next(), - sessionid: data - .parameters - .iter() - .filter(|p| p.name == "sessionID") - .map(|p| p.clone().value) - .next(), - calcTime: Some( - f64::from_str( - &data - .parameters - .iter() - .filter(|p| p.name == "calcTime") - .map(|p| p.clone().value) - .next() - .unwrap(), - ) - .unwrap(), - ), - serverTime: data - .parameters - .iter() - .filter(|p| p.name == "serverTime") - .map(|p| p.clone().value) - .next() - .unwrap() - .parse::() - .ok(), - logRequestId: data - .parameters - .iter() - .filter(|p| p.name == "logRequestId") - .map(|p| p.clone().value) - .next(), - }) - .returning(DepatureRequest::as_returning()) - .on_conflict_do_nothing() - .get_result(&mut conn) - .optional() - .unwrap() { - let _ = insert_into(stop_lists) - .values(StopList { - input: i32::from_str(&data.dm.input.input).unwrap(), - point_dm: Some(data.dm.points.point.usage), - point_type: Some(data.dm.points.point.type_field.clone()), - point_name: Some(data.dm.points.point.name), - point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(), - point_stateless: Some(data.dm.points.point.stateless), - point_anytype: Some(data.dm.points.point.any_type), - point_sort: i32::from_str(&data.dm.points.point.sort).ok(), - point_quality: i32::from_str(&data.dm.points.point.quality).ok(), - point_best: i32::from_str(&data.dm.points.point.best).ok(), - point_object: Some(data.dm.points.point.object), - point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(), - point_ref_gid: Some(data.dm.points.point.ref_field.gid), - point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(), - point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(), - point_ref_place: Some(data.dm.points.point.ref_field.place), - point_ref_coords: Some(data.dm.points.point.ref_field.coords), - itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id) - .ok(), - itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name), - itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(), - itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(), - itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name), - itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value), - itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place), - itdOdvAssignedStops_nameWithPlace: Some( - data.dm.itd_odv_assigned_stops.name_with_place, - ), - itdOdvAssignedStops_distanceTime: i32::from_str( - &data.dm.itd_odv_assigned_stops.distance_time, - ) - .ok(), - itdOdvAssignedStops_isTransferStop: i32::from_str( - &data.dm.itd_odv_assigned_stops.is_transfer_stop, - ) - .ok(), - itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(), - itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid), - meta_lastModificationDate: Some(Local::now().naive_local()), - }) - .returning(StopList::as_returning()) - .on_conflict_do_nothing() - .get_result(&mut conn) - .optional(); - for line in data.serving_lines.lines { - let id = hex::encode(Sha256::digest( - serde_json::to_string(&line.mode).unwrap().into_bytes(), - )); - let _ = insert_into(serving_lines) - .values(ServingLine { - servingLineId: id.clone(), - mode_name: Some(line.mode.name), - mode_number: Some(line.mode.number), - mode_product: Some(line.mode.product), - mode_productId: i32::from_str(&line.mode.product_id).ok(), - mode_type: i32::from_str(&line.mode.type_field).ok(), - mode_code: i32::from_str(&line.mode.code).ok(), - mode_destination: Some(line.mode.destination), - mode_destID: i32::from_str(&line.mode.dest_id).ok(), - mode_desc: Some(line.mode.desc), - mode_timetablePeriod: Some(line.mode.timetable_period), - diva_branch: Some(line.mode.diva.branch), - diva_line: Some(line.mode.diva.line), - diva_supplement: Some(line.mode.diva.supplement), - diva_dir: Some(line.mode.diva.dir), - diva_project: Some(line.mode.diva.project), - diva_network: Some(line.mode.diva.network), - diva_stateless: Some(line.mode.diva.stateless), - diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(), - diva_operator: Some(line.mode.diva.operator), - diva_opPublicCode: line.mode.diva.op_public_code, - diva_opCode: Some(line.mode.diva.op_code), - diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(), - diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(), - diva_lineDisplay: Some(line.mode.diva.line_display), - meta_lastModificationDate: req.serverTime, - }) - .returning(ServingLine::as_returning()) - .on_conflict_do_nothing() - .get_result(&mut conn) - .optional() - .unwrap(); - let _ = insert_into(serving_line_depature_requests) - .values(ServingLineDepatureRequest { - depatureRequestId: req.depatureRequestId, - servingLineId: id.clone(), - }) - .returning(ServingLineDepatureRequest::as_returning()) - .on_conflict_do_nothing() - .get_result(&mut conn) - .optional() - .unwrap(); - } - if let Some(infos) = data.dm.points.point.infos { - let _ = insert_into(station_hints).values(StationHint { - infoLinkURL: infos.info.info_link_url, - infoLinkText: Some(infos.info.info_link_text), - infoText: Some(infos.info.info_text.content), - }); - } - { - let _ = insert_into(arrivals) - .values(&arrival_list.into_iter().map(|arrival| { Arrival { - depatureRequestId: req.depatureRequestId, - stopID: i32::from_str(&arrival.stop_id).ok(), - x: f64::from_str(&arrival.x).ok(), - y: f64::from_str(&arrival.y).ok(), - mapName: Some(arrival.map_name), - area: Some(arrival.area), - platform: Some(arrival.platform), - platformName: Some(arrival.platform_name), - stopName: Some(arrival.stop_name), - nameWO: Some(arrival.name_wo), - pointType: Some(data.dm.points.point.type_field.clone()), - countdown: i32::from_str(&arrival.countdown).ok(), - arrivalTime: NaiveDate::from_ymd_opt( - i32::from_str(&arrival.date_time.year).unwrap(), - u32::from_str(&arrival.date_time.month).unwrap(), - u32::from_str(&arrival.date_time.day).unwrap(), - ) - .unwrap() - .and_hms_opt( - u32::from_str(&arrival.date_time.hour).unwrap(), - u32::from_str(&arrival.date_time.minute).unwrap(), - 0, - ), - realArrivalTime: if let Some(date_time) = arrival.real_date_time { - NaiveDate::from_ymd_opt( - i32::from_str(&date_time.year).unwrap(), - u32::from_str(&date_time.month).unwrap(), - u32::from_str(&date_time.day).unwrap(), - ) - .unwrap() - .and_hms_opt( - u32::from_str(&date_time.hour).unwrap(), - u32::from_str(&date_time.minute).unwrap(), - 0, - ) - } else { - None - }, - servingLine_key: Some(arrival.serving_line.key), - servingLine_code: Some(arrival.serving_line.code), - servingLine_number: Some(arrival.serving_line.number), - servingLine_symbol: Some(arrival.serving_line.symbol), - servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(), - servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode).ok(), - servingLine_realtime: bool::from_str(&arrival.serving_line.realtime).ok(), - servingLine_direction: Some(arrival.serving_line.direction), - servingLine_directionFrom: Some(arrival.serving_line.direction_from), - servingLine_name: Some(arrival.serving_line.name), - servingLine_delay: if let Some(delay) = arrival.serving_line.delay { - i32::from_str(&delay).ok() - } else { - None - }, - servingLine_liErgRiProj_line: Some(arrival.serving_line.li_erg_ri_proj.line), - servingLine_liErgRiProj_project: Some( - arrival.serving_line.li_erg_ri_proj.project, - ), - servingLine_liErgRiProj_direction: Some( - arrival.serving_line.li_erg_ri_proj.direction, - ), - servingLine_liErgRiProj_supplement: Some( - arrival.serving_line.li_erg_ri_proj.supplement, - ), - servingLine_liErgRiProj_network: Some( - arrival.serving_line.li_erg_ri_proj.network, - ), - servingLine_destID: Some(arrival.serving_line.dest_id), - servingLine_stateless: Some(arrival.serving_line.stateless), - servingLine_lineDisplay: Some(arrival.serving_line.line_display), - operator_code: arrival.operator.clone().map(|op| op.code), - operator_name: arrival.operator.clone().map(|op| op.name), - operator_publicCode: arrival.operator.map(|op| op.public_code), - attrs_name: None, - attrs_value: None, - }}).collect::>()) - .returning(Arrival::as_returning()) - .on_conflict_do_nothing() - .get_results(&mut conn) - .optional(); - } - } - }); - - println!(""); - Ok(()) -}