modularize stuff
This commit is contained in:
parent
4ce2946b6a
commit
c4e5afc265
10 changed files with 325 additions and 296 deletions
0
Cargo.lock → dataworker/Cargo.lock
generated
0
Cargo.lock → dataworker/Cargo.lock
generated
|
@ -1,11 +1,11 @@
|
|||
[package]
|
||||
name = "willdaten"
|
||||
name = "dataworker"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
chrono = "0.4.39"
|
||||
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] }
|
||||
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend"] }
|
||||
dotenvy = "0.15.7"
|
||||
hex = "0.4.3"
|
||||
maud = "0.26.0"
|
283
dataworker/src/lib.rs
Normal file
283
dataworker/src/lib.rs
Normal file
|
@ -0,0 +1,283 @@
|
|||
#[macro_use]
|
||||
extern crate diesel;
|
||||
|
||||
mod dtos;
|
||||
mod models;
|
||||
mod schema;
|
||||
|
||||
use chrono::NaiveDate;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use diesel::OptionalExtension;
|
||||
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
|
||||
use dtos::Root;
|
||||
use hex;
|
||||
use models::{
|
||||
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest,
|
||||
StationHint, StopList,
|
||||
};
|
||||
use schema::arrivals::dsl::arrivals;
|
||||
use schema::depature_requests::dsl::depature_requests;
|
||||
use schema::serving_line_depature_requests::dsl::serving_line_depature_requests;
|
||||
use schema::serving_lines::dsl::serving_lines;
|
||||
use schema::station_hints::dsl::station_hints;
|
||||
use schema::stop_lists::dsl::stop_lists;
|
||||
use serde_json;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::{env, error::Error, fs, io::Write, str::FromStr};
|
||||
|
||||
pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> {
|
||||
let arrival_list = data.arrival_list.unwrap();
|
||||
if let Some(req) = insert_into(depature_requests)
|
||||
.values(NewDepatureRequest {
|
||||
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
|
||||
serverid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "serverID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
requestid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "requestID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
sessionid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "sessionID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
calcTime: Some(
|
||||
f64::from_str(
|
||||
&data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "calcTime")
|
||||
.map(|p| p.clone().value)
|
||||
.next()
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
),
|
||||
serverTime: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "serverTime")
|
||||
.map(|p| p.clone().value)
|
||||
.next()
|
||||
.unwrap()
|
||||
.parse::<NaiveDateTime>()
|
||||
.ok(),
|
||||
logRequestId: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "logRequestId")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
})
|
||||
.returning(DepatureRequest::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(conn)
|
||||
.optional()
|
||||
.unwrap()
|
||||
{
|
||||
let _ = insert_into(stop_lists)
|
||||
.values(StopList {
|
||||
input: i32::from_str(&data.dm.input.input).unwrap(),
|
||||
point_dm: Some(data.dm.points.point.usage),
|
||||
point_type: Some(data.dm.points.point.type_field.clone()),
|
||||
point_name: Some(data.dm.points.point.name),
|
||||
point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
|
||||
point_stateless: Some(data.dm.points.point.stateless),
|
||||
point_anytype: Some(data.dm.points.point.any_type),
|
||||
point_sort: i32::from_str(&data.dm.points.point.sort).ok(),
|
||||
point_quality: i32::from_str(&data.dm.points.point.quality).ok(),
|
||||
point_best: i32::from_str(&data.dm.points.point.best).ok(),
|
||||
point_object: Some(data.dm.points.point.object),
|
||||
point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
|
||||
point_ref_gid: Some(data.dm.points.point.ref_field.gid),
|
||||
point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(),
|
||||
point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(),
|
||||
point_ref_place: Some(data.dm.points.point.ref_field.place),
|
||||
point_ref_coords: Some(data.dm.points.point.ref_field.coords),
|
||||
itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name),
|
||||
itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(),
|
||||
itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(),
|
||||
itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name),
|
||||
itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value),
|
||||
itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place),
|
||||
itdOdvAssignedStops_nameWithPlace: Some(
|
||||
data.dm.itd_odv_assigned_stops.name_with_place,
|
||||
),
|
||||
itdOdvAssignedStops_distanceTime: i32::from_str(
|
||||
&data.dm.itd_odv_assigned_stops.distance_time,
|
||||
)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_isTransferStop: i32::from_str(
|
||||
&data.dm.itd_odv_assigned_stops.is_transfer_stop,
|
||||
)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(),
|
||||
itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid),
|
||||
meta_lastModificationDate: Some(Local::now().naive_local()),
|
||||
})
|
||||
.returning(StopList::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(conn)
|
||||
.optional();
|
||||
for line in data.serving_lines.lines {
|
||||
let id = hex::encode(Sha256::digest(
|
||||
serde_json::to_string(&line.mode).unwrap().into_bytes(),
|
||||
));
|
||||
let _ = insert_into(serving_lines)
|
||||
.values(ServingLine {
|
||||
servingLineId: id.clone(),
|
||||
mode_name: Some(line.mode.name),
|
||||
mode_number: Some(line.mode.number),
|
||||
mode_product: Some(line.mode.product),
|
||||
mode_productId: i32::from_str(&line.mode.product_id).ok(),
|
||||
mode_type: i32::from_str(&line.mode.type_field).ok(),
|
||||
mode_code: i32::from_str(&line.mode.code).ok(),
|
||||
mode_destination: Some(line.mode.destination),
|
||||
mode_destID: i32::from_str(&line.mode.dest_id).ok(),
|
||||
mode_desc: Some(line.mode.desc),
|
||||
mode_timetablePeriod: Some(line.mode.timetable_period),
|
||||
diva_branch: Some(line.mode.diva.branch),
|
||||
diva_line: Some(line.mode.diva.line),
|
||||
diva_supplement: Some(line.mode.diva.supplement),
|
||||
diva_dir: Some(line.mode.diva.dir),
|
||||
diva_project: Some(line.mode.diva.project),
|
||||
diva_network: Some(line.mode.diva.network),
|
||||
diva_stateless: Some(line.mode.diva.stateless),
|
||||
diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(),
|
||||
diva_operator: Some(line.mode.diva.operator),
|
||||
diva_opPublicCode: line.mode.diva.op_public_code,
|
||||
diva_opCode: Some(line.mode.diva.op_code),
|
||||
diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(),
|
||||
diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(),
|
||||
diva_lineDisplay: Some(line.mode.diva.line_display),
|
||||
meta_lastModificationDate: req.serverTime,
|
||||
})
|
||||
.returning(ServingLine::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(conn)
|
||||
.optional()
|
||||
.unwrap();
|
||||
let _ = insert_into(serving_line_depature_requests)
|
||||
.values(ServingLineDepatureRequest {
|
||||
depatureRequestId: req.depatureRequestId,
|
||||
servingLineId: id.clone(),
|
||||
})
|
||||
.returning(ServingLineDepatureRequest::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(conn)
|
||||
.optional()
|
||||
.unwrap();
|
||||
}
|
||||
if let Some(infos) = data.dm.points.point.infos {
|
||||
let _ = insert_into(station_hints).values(StationHint {
|
||||
infoLinkURL: infos.info.info_link_url,
|
||||
infoLinkText: Some(infos.info.info_link_text),
|
||||
infoText: Some(infos.info.info_text.content),
|
||||
});
|
||||
}
|
||||
{
|
||||
let _ = insert_into(arrivals)
|
||||
.values(
|
||||
&arrival_list
|
||||
.into_iter()
|
||||
.map(|arrival| Arrival {
|
||||
depatureRequestId: req.depatureRequestId,
|
||||
stopID: i32::from_str(&arrival.stop_id).ok(),
|
||||
x: f64::from_str(&arrival.x).ok(),
|
||||
y: f64::from_str(&arrival.y).ok(),
|
||||
mapName: Some(arrival.map_name),
|
||||
area: Some(arrival.area),
|
||||
platform: Some(arrival.platform),
|
||||
platformName: Some(arrival.platform_name),
|
||||
stopName: Some(arrival.stop_name),
|
||||
nameWO: Some(arrival.name_wo),
|
||||
pointType: Some(data.dm.points.point.type_field.clone()),
|
||||
countdown: i32::from_str(&arrival.countdown).ok(),
|
||||
arrivalTime: NaiveDate::from_ymd_opt(
|
||||
i32::from_str(&arrival.date_time.year).unwrap(),
|
||||
u32::from_str(&arrival.date_time.month).unwrap(),
|
||||
u32::from_str(&arrival.date_time.day).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
.and_hms_opt(
|
||||
u32::from_str(&arrival.date_time.hour).unwrap(),
|
||||
u32::from_str(&arrival.date_time.minute).unwrap(),
|
||||
0,
|
||||
),
|
||||
realArrivalTime: if let Some(date_time) = arrival.real_date_time {
|
||||
NaiveDate::from_ymd_opt(
|
||||
i32::from_str(&date_time.year).unwrap(),
|
||||
u32::from_str(&date_time.month).unwrap(),
|
||||
u32::from_str(&date_time.day).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
.and_hms_opt(
|
||||
u32::from_str(&date_time.hour).unwrap(),
|
||||
u32::from_str(&date_time.minute).unwrap(),
|
||||
0,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
servingLine_key: Some(arrival.serving_line.key),
|
||||
servingLine_code: Some(arrival.serving_line.code),
|
||||
servingLine_number: Some(arrival.serving_line.number),
|
||||
servingLine_symbol: Some(arrival.serving_line.symbol),
|
||||
servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(),
|
||||
servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode)
|
||||
.ok(),
|
||||
servingLine_realtime: bool::from_str(&arrival.serving_line.realtime)
|
||||
.ok(),
|
||||
servingLine_direction: Some(arrival.serving_line.direction),
|
||||
servingLine_directionFrom: Some(arrival.serving_line.direction_from),
|
||||
servingLine_name: Some(arrival.serving_line.name),
|
||||
servingLine_delay: if let Some(delay) = arrival.serving_line.delay {
|
||||
i32::from_str(&delay).ok()
|
||||
} else {
|
||||
None
|
||||
},
|
||||
servingLine_liErgRiProj_line: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.line,
|
||||
),
|
||||
servingLine_liErgRiProj_project: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.project,
|
||||
),
|
||||
servingLine_liErgRiProj_direction: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.direction,
|
||||
),
|
||||
servingLine_liErgRiProj_supplement: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.supplement,
|
||||
),
|
||||
servingLine_liErgRiProj_network: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.network,
|
||||
),
|
||||
servingLine_destID: Some(arrival.serving_line.dest_id),
|
||||
servingLine_stateless: Some(arrival.serving_line.stateless),
|
||||
servingLine_lineDisplay: Some(arrival.serving_line.line_display),
|
||||
operator_code: arrival.operator.clone().map(|op| op.code),
|
||||
operator_name: arrival.operator.clone().map(|op| op.name),
|
||||
operator_publicCode: arrival.operator.map(|op| op.public_code),
|
||||
attrs_name: None,
|
||||
attrs_value: None,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.returning(Arrival::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_results(conn)
|
||||
.optional();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
40
dataworker/src/main.rs
Normal file
40
dataworker/src/main.rs
Normal file
|
@ -0,0 +1,40 @@
|
|||
#[macro_use]
|
||||
extern crate diesel;
|
||||
|
||||
use dataworker::dtos::Root;
|
||||
use dataworker::insert_document;
|
||||
use diesel::{Connection, PgConnection};
|
||||
use dotenvy::dotenv;
|
||||
use std::error::Error;
|
||||
use std::io::Write;
|
||||
use std::{env, fs};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
dotenv().ok();
|
||||
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let mut conn =
|
||||
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url));
|
||||
|
||||
let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22";
|
||||
let total = fs::read_dir(src).unwrap().count();
|
||||
let mut count = 0;
|
||||
|
||||
fs::read_dir(src).unwrap().into_iter().for_each(|path| {
|
||||
count += 1;
|
||||
let path = path.unwrap();
|
||||
print!(
|
||||
"\x1B[2K\r {}/{} ({})",
|
||||
count,
|
||||
total,
|
||||
path.file_name().to_str().unwrap()
|
||||
);
|
||||
std::io::stdout().flush().unwrap();
|
||||
let content = fs::read_to_string(path.path()).unwrap();
|
||||
let data = serde_json::from_str::<Root>(&content).unwrap();
|
||||
insert_document(data, &mut conn).unwrap();
|
||||
});
|
||||
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
294
src/main.rs
294
src/main.rs
|
@ -1,294 +0,0 @@
|
|||
#[macro_use]
|
||||
extern crate diesel;
|
||||
|
||||
mod dtos;
|
||||
mod models;
|
||||
mod schema;
|
||||
|
||||
use chrono::NaiveDate;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
|
||||
use dotenvy::dotenv;
|
||||
use dtos::Root;
|
||||
use hex;
|
||||
use models::{
|
||||
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest,
|
||||
StationHint, StopList,
|
||||
};
|
||||
use serde_json;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::{env, error::Error, fs, str::FromStr, io::Write};
|
||||
|
||||
use crate::diesel::OptionalExtension;
|
||||
use crate::schema::arrivals::dsl::arrivals;
|
||||
use crate::schema::depature_requests::dsl::depature_requests;
|
||||
use crate::schema::serving_line_depature_requests::dsl::serving_line_depature_requests;
|
||||
use crate::schema::serving_lines::dsl::serving_lines;
|
||||
use crate::schema::station_hints::dsl::station_hints;
|
||||
use crate::schema::stop_lists::dsl::stop_lists;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
dotenv().ok();
|
||||
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let mut conn =
|
||||
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url));
|
||||
|
||||
let src = "/home/bread/Downloads/2025-01-22";
|
||||
let total = fs::read_dir(src).unwrap().count();
|
||||
let mut count = 0;
|
||||
|
||||
fs::read_dir(src).unwrap().into_iter().for_each(|path| {
|
||||
count += 1;
|
||||
let path = path.unwrap();
|
||||
print!("\x1B[2K\r {}/{} ({})", count, total, path.file_name().to_str().unwrap());
|
||||
std::io::stdout().flush().unwrap();
|
||||
let content = fs::read_to_string(path.path()).unwrap();
|
||||
let data = serde_json::from_str::<Root>(&content).unwrap();
|
||||
let arrival_list = data.arrival_list.unwrap();
|
||||
if let Some(req) = insert_into(depature_requests)
|
||||
.values(NewDepatureRequest {
|
||||
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
|
||||
serverid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "serverID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
requestid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "requestID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
sessionid: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "sessionID")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
calcTime: Some(
|
||||
f64::from_str(
|
||||
&data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "calcTime")
|
||||
.map(|p| p.clone().value)
|
||||
.next()
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
),
|
||||
serverTime: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "serverTime")
|
||||
.map(|p| p.clone().value)
|
||||
.next()
|
||||
.unwrap()
|
||||
.parse::<NaiveDateTime>()
|
||||
.ok(),
|
||||
logRequestId: data
|
||||
.parameters
|
||||
.iter()
|
||||
.filter(|p| p.name == "logRequestId")
|
||||
.map(|p| p.clone().value)
|
||||
.next(),
|
||||
})
|
||||
.returning(DepatureRequest::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(&mut conn)
|
||||
.optional()
|
||||
.unwrap() {
|
||||
let _ = insert_into(stop_lists)
|
||||
.values(StopList {
|
||||
input: i32::from_str(&data.dm.input.input).unwrap(),
|
||||
point_dm: Some(data.dm.points.point.usage),
|
||||
point_type: Some(data.dm.points.point.type_field.clone()),
|
||||
point_name: Some(data.dm.points.point.name),
|
||||
point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
|
||||
point_stateless: Some(data.dm.points.point.stateless),
|
||||
point_anytype: Some(data.dm.points.point.any_type),
|
||||
point_sort: i32::from_str(&data.dm.points.point.sort).ok(),
|
||||
point_quality: i32::from_str(&data.dm.points.point.quality).ok(),
|
||||
point_best: i32::from_str(&data.dm.points.point.best).ok(),
|
||||
point_object: Some(data.dm.points.point.object),
|
||||
point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
|
||||
point_ref_gid: Some(data.dm.points.point.ref_field.gid),
|
||||
point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(),
|
||||
point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(),
|
||||
point_ref_place: Some(data.dm.points.point.ref_field.place),
|
||||
point_ref_coords: Some(data.dm.points.point.ref_field.coords),
|
||||
itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name),
|
||||
itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(),
|
||||
itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(),
|
||||
itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name),
|
||||
itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value),
|
||||
itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place),
|
||||
itdOdvAssignedStops_nameWithPlace: Some(
|
||||
data.dm.itd_odv_assigned_stops.name_with_place,
|
||||
),
|
||||
itdOdvAssignedStops_distanceTime: i32::from_str(
|
||||
&data.dm.itd_odv_assigned_stops.distance_time,
|
||||
)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_isTransferStop: i32::from_str(
|
||||
&data.dm.itd_odv_assigned_stops.is_transfer_stop,
|
||||
)
|
||||
.ok(),
|
||||
itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(),
|
||||
itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid),
|
||||
meta_lastModificationDate: Some(Local::now().naive_local()),
|
||||
})
|
||||
.returning(StopList::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(&mut conn)
|
||||
.optional();
|
||||
for line in data.serving_lines.lines {
|
||||
let id = hex::encode(Sha256::digest(
|
||||
serde_json::to_string(&line.mode).unwrap().into_bytes(),
|
||||
));
|
||||
let _ = insert_into(serving_lines)
|
||||
.values(ServingLine {
|
||||
servingLineId: id.clone(),
|
||||
mode_name: Some(line.mode.name),
|
||||
mode_number: Some(line.mode.number),
|
||||
mode_product: Some(line.mode.product),
|
||||
mode_productId: i32::from_str(&line.mode.product_id).ok(),
|
||||
mode_type: i32::from_str(&line.mode.type_field).ok(),
|
||||
mode_code: i32::from_str(&line.mode.code).ok(),
|
||||
mode_destination: Some(line.mode.destination),
|
||||
mode_destID: i32::from_str(&line.mode.dest_id).ok(),
|
||||
mode_desc: Some(line.mode.desc),
|
||||
mode_timetablePeriod: Some(line.mode.timetable_period),
|
||||
diva_branch: Some(line.mode.diva.branch),
|
||||
diva_line: Some(line.mode.diva.line),
|
||||
diva_supplement: Some(line.mode.diva.supplement),
|
||||
diva_dir: Some(line.mode.diva.dir),
|
||||
diva_project: Some(line.mode.diva.project),
|
||||
diva_network: Some(line.mode.diva.network),
|
||||
diva_stateless: Some(line.mode.diva.stateless),
|
||||
diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(),
|
||||
diva_operator: Some(line.mode.diva.operator),
|
||||
diva_opPublicCode: line.mode.diva.op_public_code,
|
||||
diva_opCode: Some(line.mode.diva.op_code),
|
||||
diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(),
|
||||
diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(),
|
||||
diva_lineDisplay: Some(line.mode.diva.line_display),
|
||||
meta_lastModificationDate: req.serverTime,
|
||||
})
|
||||
.returning(ServingLine::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(&mut conn)
|
||||
.optional()
|
||||
.unwrap();
|
||||
let _ = insert_into(serving_line_depature_requests)
|
||||
.values(ServingLineDepatureRequest {
|
||||
depatureRequestId: req.depatureRequestId,
|
||||
servingLineId: id.clone(),
|
||||
})
|
||||
.returning(ServingLineDepatureRequest::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_result(&mut conn)
|
||||
.optional()
|
||||
.unwrap();
|
||||
}
|
||||
if let Some(infos) = data.dm.points.point.infos {
|
||||
let _ = insert_into(station_hints).values(StationHint {
|
||||
infoLinkURL: infos.info.info_link_url,
|
||||
infoLinkText: Some(infos.info.info_link_text),
|
||||
infoText: Some(infos.info.info_text.content),
|
||||
});
|
||||
}
|
||||
{
|
||||
let _ = insert_into(arrivals)
|
||||
.values(&arrival_list.into_iter().map(|arrival| { Arrival {
|
||||
depatureRequestId: req.depatureRequestId,
|
||||
stopID: i32::from_str(&arrival.stop_id).ok(),
|
||||
x: f64::from_str(&arrival.x).ok(),
|
||||
y: f64::from_str(&arrival.y).ok(),
|
||||
mapName: Some(arrival.map_name),
|
||||
area: Some(arrival.area),
|
||||
platform: Some(arrival.platform),
|
||||
platformName: Some(arrival.platform_name),
|
||||
stopName: Some(arrival.stop_name),
|
||||
nameWO: Some(arrival.name_wo),
|
||||
pointType: Some(data.dm.points.point.type_field.clone()),
|
||||
countdown: i32::from_str(&arrival.countdown).ok(),
|
||||
arrivalTime: NaiveDate::from_ymd_opt(
|
||||
i32::from_str(&arrival.date_time.year).unwrap(),
|
||||
u32::from_str(&arrival.date_time.month).unwrap(),
|
||||
u32::from_str(&arrival.date_time.day).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
.and_hms_opt(
|
||||
u32::from_str(&arrival.date_time.hour).unwrap(),
|
||||
u32::from_str(&arrival.date_time.minute).unwrap(),
|
||||
0,
|
||||
),
|
||||
realArrivalTime: if let Some(date_time) = arrival.real_date_time {
|
||||
NaiveDate::from_ymd_opt(
|
||||
i32::from_str(&date_time.year).unwrap(),
|
||||
u32::from_str(&date_time.month).unwrap(),
|
||||
u32::from_str(&date_time.day).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
.and_hms_opt(
|
||||
u32::from_str(&date_time.hour).unwrap(),
|
||||
u32::from_str(&date_time.minute).unwrap(),
|
||||
0,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
servingLine_key: Some(arrival.serving_line.key),
|
||||
servingLine_code: Some(arrival.serving_line.code),
|
||||
servingLine_number: Some(arrival.serving_line.number),
|
||||
servingLine_symbol: Some(arrival.serving_line.symbol),
|
||||
servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(),
|
||||
servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode).ok(),
|
||||
servingLine_realtime: bool::from_str(&arrival.serving_line.realtime).ok(),
|
||||
servingLine_direction: Some(arrival.serving_line.direction),
|
||||
servingLine_directionFrom: Some(arrival.serving_line.direction_from),
|
||||
servingLine_name: Some(arrival.serving_line.name),
|
||||
servingLine_delay: if let Some(delay) = arrival.serving_line.delay {
|
||||
i32::from_str(&delay).ok()
|
||||
} else {
|
||||
None
|
||||
},
|
||||
servingLine_liErgRiProj_line: Some(arrival.serving_line.li_erg_ri_proj.line),
|
||||
servingLine_liErgRiProj_project: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.project,
|
||||
),
|
||||
servingLine_liErgRiProj_direction: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.direction,
|
||||
),
|
||||
servingLine_liErgRiProj_supplement: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.supplement,
|
||||
),
|
||||
servingLine_liErgRiProj_network: Some(
|
||||
arrival.serving_line.li_erg_ri_proj.network,
|
||||
),
|
||||
servingLine_destID: Some(arrival.serving_line.dest_id),
|
||||
servingLine_stateless: Some(arrival.serving_line.stateless),
|
||||
servingLine_lineDisplay: Some(arrival.serving_line.line_display),
|
||||
operator_code: arrival.operator.clone().map(|op| op.code),
|
||||
operator_name: arrival.operator.clone().map(|op| op.name),
|
||||
operator_publicCode: arrival.operator.map(|op| op.public_code),
|
||||
attrs_name: None,
|
||||
attrs_value: None,
|
||||
}}).collect::<Vec<_>>())
|
||||
.returning(Arrival::as_returning())
|
||||
.on_conflict_do_nothing()
|
||||
.get_results(&mut conn)
|
||||
.optional();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
println!("");
|
||||
Ok(())
|
||||
}
|
Loading…
Add table
Reference in a new issue