modularize stuff

This commit is contained in:
theBreadCompany 2025-01-22 04:17:05 +01:00
parent 4ce2946b6a
commit 2577edbf44
10 changed files with 325 additions and 296 deletions

View file

View file

@ -1,11 +1,11 @@
[package]
name = "willdaten"
name = "dataworker"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.39"
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] }
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend"] }
dotenvy = "0.15.7"
hex = "0.4.3"
maud = "0.26.0"

283
dataworker/src/lib.rs Normal file
View file

@ -0,0 +1,283 @@
#[macro_use]
extern crate diesel;
pub mod dtos;
mod models;
mod schema;
use chrono::NaiveDate;
use chrono::{Local, NaiveDateTime};
use diesel::OptionalExtension;
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
use dtos::Root;
use hex;
use models::{
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest,
StationHint, StopList,
};
use schema::arrivals::dsl::arrivals;
use schema::depature_requests::dsl::depature_requests;
use schema::serving_line_depature_requests::dsl::serving_line_depature_requests;
use schema::serving_lines::dsl::serving_lines;
use schema::station_hints::dsl::station_hints;
use schema::stop_lists::dsl::stop_lists;
use serde_json;
use sha2::Digest;
use sha2::Sha256;
use std::{env, error::Error, fs, io::Write, str::FromStr};
pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> {
let arrival_list = data.arrival_list.unwrap();
if let Some(req) = insert_into(depature_requests)
.values(NewDepatureRequest {
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
serverid: data
.parameters
.iter()
.filter(|p| p.name == "serverID")
.map(|p| p.clone().value)
.next(),
requestid: data
.parameters
.iter()
.filter(|p| p.name == "requestID")
.map(|p| p.clone().value)
.next(),
sessionid: data
.parameters
.iter()
.filter(|p| p.name == "sessionID")
.map(|p| p.clone().value)
.next(),
calcTime: Some(
f64::from_str(
&data
.parameters
.iter()
.filter(|p| p.name == "calcTime")
.map(|p| p.clone().value)
.next()
.unwrap(),
)
.unwrap(),
),
serverTime: data
.parameters
.iter()
.filter(|p| p.name == "serverTime")
.map(|p| p.clone().value)
.next()
.unwrap()
.parse::<NaiveDateTime>()
.ok(),
logRequestId: data
.parameters
.iter()
.filter(|p| p.name == "logRequestId")
.map(|p| p.clone().value)
.next(),
})
.returning(DepatureRequest::as_returning())
.on_conflict_do_nothing()
.get_result(conn)
.optional()
.unwrap()
{
let _ = insert_into(stop_lists)
.values(StopList {
input: i32::from_str(&data.dm.input.input).unwrap(),
point_dm: Some(data.dm.points.point.usage),
point_type: Some(data.dm.points.point.type_field.clone()),
point_name: Some(data.dm.points.point.name),
point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
point_stateless: Some(data.dm.points.point.stateless),
point_anytype: Some(data.dm.points.point.any_type),
point_sort: i32::from_str(&data.dm.points.point.sort).ok(),
point_quality: i32::from_str(&data.dm.points.point.quality).ok(),
point_best: i32::from_str(&data.dm.points.point.best).ok(),
point_object: Some(data.dm.points.point.object),
point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
point_ref_gid: Some(data.dm.points.point.ref_field.gid),
point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(),
point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(),
point_ref_place: Some(data.dm.points.point.ref_field.place),
point_ref_coords: Some(data.dm.points.point.ref_field.coords),
itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id)
.ok(),
itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name),
itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(),
itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(),
itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name),
itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value),
itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place),
itdOdvAssignedStops_nameWithPlace: Some(
data.dm.itd_odv_assigned_stops.name_with_place,
),
itdOdvAssignedStops_distanceTime: i32::from_str(
&data.dm.itd_odv_assigned_stops.distance_time,
)
.ok(),
itdOdvAssignedStops_isTransferStop: i32::from_str(
&data.dm.itd_odv_assigned_stops.is_transfer_stop,
)
.ok(),
itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(),
itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid),
meta_lastModificationDate: Some(Local::now().naive_local()),
})
.returning(StopList::as_returning())
.on_conflict_do_nothing()
.get_result(conn)
.optional();
for line in data.serving_lines.lines {
let id = hex::encode(Sha256::digest(
serde_json::to_string(&line.mode).unwrap().into_bytes(),
));
let _ = insert_into(serving_lines)
.values(ServingLine {
servingLineId: id.clone(),
mode_name: Some(line.mode.name),
mode_number: Some(line.mode.number),
mode_product: Some(line.mode.product),
mode_productId: i32::from_str(&line.mode.product_id).ok(),
mode_type: i32::from_str(&line.mode.type_field).ok(),
mode_code: i32::from_str(&line.mode.code).ok(),
mode_destination: Some(line.mode.destination),
mode_destID: i32::from_str(&line.mode.dest_id).ok(),
mode_desc: Some(line.mode.desc),
mode_timetablePeriod: Some(line.mode.timetable_period),
diva_branch: Some(line.mode.diva.branch),
diva_line: Some(line.mode.diva.line),
diva_supplement: Some(line.mode.diva.supplement),
diva_dir: Some(line.mode.diva.dir),
diva_project: Some(line.mode.diva.project),
diva_network: Some(line.mode.diva.network),
diva_stateless: Some(line.mode.diva.stateless),
diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(),
diva_operator: Some(line.mode.diva.operator),
diva_opPublicCode: line.mode.diva.op_public_code,
diva_opCode: Some(line.mode.diva.op_code),
diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(),
diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(),
diva_lineDisplay: Some(line.mode.diva.line_display),
meta_lastModificationDate: req.serverTime,
})
.returning(ServingLine::as_returning())
.on_conflict_do_nothing()
.get_result(conn)
.optional()
.unwrap();
let _ = insert_into(serving_line_depature_requests)
.values(ServingLineDepatureRequest {
depatureRequestId: req.depatureRequestId,
servingLineId: id.clone(),
})
.returning(ServingLineDepatureRequest::as_returning())
.on_conflict_do_nothing()
.get_result(conn)
.optional()
.unwrap();
}
if let Some(infos) = data.dm.points.point.infos {
let _ = insert_into(station_hints).values(StationHint {
infoLinkURL: infos.info.info_link_url,
infoLinkText: Some(infos.info.info_link_text),
infoText: Some(infos.info.info_text.content),
});
}
{
let _ = insert_into(arrivals)
.values(
&arrival_list
.into_iter()
.map(|arrival| Arrival {
depatureRequestId: req.depatureRequestId,
stopID: i32::from_str(&arrival.stop_id).ok(),
x: f64::from_str(&arrival.x).ok(),
y: f64::from_str(&arrival.y).ok(),
mapName: Some(arrival.map_name),
area: Some(arrival.area),
platform: Some(arrival.platform),
platformName: Some(arrival.platform_name),
stopName: Some(arrival.stop_name),
nameWO: Some(arrival.name_wo),
pointType: Some(data.dm.points.point.type_field.clone()),
countdown: i32::from_str(&arrival.countdown).ok(),
arrivalTime: NaiveDate::from_ymd_opt(
i32::from_str(&arrival.date_time.year).unwrap(),
u32::from_str(&arrival.date_time.month).unwrap(),
u32::from_str(&arrival.date_time.day).unwrap(),
)
.unwrap()
.and_hms_opt(
u32::from_str(&arrival.date_time.hour).unwrap(),
u32::from_str(&arrival.date_time.minute).unwrap(),
0,
),
realArrivalTime: if let Some(date_time) = arrival.real_date_time {
NaiveDate::from_ymd_opt(
i32::from_str(&date_time.year).unwrap(),
u32::from_str(&date_time.month).unwrap(),
u32::from_str(&date_time.day).unwrap(),
)
.unwrap()
.and_hms_opt(
u32::from_str(&date_time.hour).unwrap(),
u32::from_str(&date_time.minute).unwrap(),
0,
)
} else {
None
},
servingLine_key: Some(arrival.serving_line.key),
servingLine_code: Some(arrival.serving_line.code),
servingLine_number: Some(arrival.serving_line.number),
servingLine_symbol: Some(arrival.serving_line.symbol),
servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(),
servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode)
.ok(),
servingLine_realtime: bool::from_str(&arrival.serving_line.realtime)
.ok(),
servingLine_direction: Some(arrival.serving_line.direction),
servingLine_directionFrom: Some(arrival.serving_line.direction_from),
servingLine_name: Some(arrival.serving_line.name),
servingLine_delay: if let Some(delay) = arrival.serving_line.delay {
i32::from_str(&delay).ok()
} else {
None
},
servingLine_liErgRiProj_line: Some(
arrival.serving_line.li_erg_ri_proj.line,
),
servingLine_liErgRiProj_project: Some(
arrival.serving_line.li_erg_ri_proj.project,
),
servingLine_liErgRiProj_direction: Some(
arrival.serving_line.li_erg_ri_proj.direction,
),
servingLine_liErgRiProj_supplement: Some(
arrival.serving_line.li_erg_ri_proj.supplement,
),
servingLine_liErgRiProj_network: Some(
arrival.serving_line.li_erg_ri_proj.network,
),
servingLine_destID: Some(arrival.serving_line.dest_id),
servingLine_stateless: Some(arrival.serving_line.stateless),
servingLine_lineDisplay: Some(arrival.serving_line.line_display),
operator_code: arrival.operator.clone().map(|op| op.code),
operator_name: arrival.operator.clone().map(|op| op.name),
operator_publicCode: arrival.operator.map(|op| op.public_code),
attrs_name: None,
attrs_value: None,
})
.collect::<Vec<_>>(),
)
.returning(Arrival::as_returning())
.on_conflict_do_nothing()
.get_results(conn)
.optional();
}
}
Ok(())
}

40
dataworker/src/main.rs Normal file
View file

@ -0,0 +1,40 @@
#[macro_use]
extern crate diesel;
use dataworker::dtos::Root;
use dataworker::insert_document;
use diesel::{Connection, PgConnection};
use dotenvy::dotenv;
use std::error::Error;
use std::io::Write;
use std::{env, fs};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
dotenv().ok();
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn =
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url));
let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22";
let total = fs::read_dir(src).unwrap().count();
let mut count = 0;
fs::read_dir(src).unwrap().into_iter().for_each(|path| {
count += 1;
let path = path.unwrap();
print!(
"\x1B[2K\r {}/{} ({})",
count,
total,
path.file_name().to_str().unwrap()
);
std::io::stdout().flush().unwrap();
let content = fs::read_to_string(path.path()).unwrap();
let data = serde_json::from_str::<Root>(&content).unwrap();
insert_document(data, &mut conn).unwrap();
});
println!();
Ok(())
}

View file

@ -1,294 +0,0 @@
#[macro_use]
extern crate diesel;
mod dtos;
mod models;
mod schema;
use chrono::NaiveDate;
use chrono::{Local, NaiveDateTime};
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
use dotenvy::dotenv;
use dtos::Root;
use hex;
use models::{
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest,
StationHint, StopList,
};
use serde_json;
use sha2::Digest;
use sha2::Sha256;
use std::{env, error::Error, fs, str::FromStr, io::Write};
use crate::diesel::OptionalExtension;
use crate::schema::arrivals::dsl::arrivals;
use crate::schema::depature_requests::dsl::depature_requests;
use crate::schema::serving_line_depature_requests::dsl::serving_line_depature_requests;
use crate::schema::serving_lines::dsl::serving_lines;
use crate::schema::station_hints::dsl::station_hints;
use crate::schema::stop_lists::dsl::stop_lists;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
dotenv().ok();
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn =
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url));
let src = "/home/bread/Downloads/2025-01-22";
let total = fs::read_dir(src).unwrap().count();
let mut count = 0;
fs::read_dir(src).unwrap().into_iter().for_each(|path| {
count += 1;
let path = path.unwrap();
print!("\x1B[2K\r {}/{} ({})", count, total, path.file_name().to_str().unwrap());
std::io::stdout().flush().unwrap();
let content = fs::read_to_string(path.path()).unwrap();
let data = serde_json::from_str::<Root>(&content).unwrap();
let arrival_list = data.arrival_list.unwrap();
if let Some(req) = insert_into(depature_requests)
.values(NewDepatureRequest {
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
serverid: data
.parameters
.iter()
.filter(|p| p.name == "serverID")
.map(|p| p.clone().value)
.next(),
requestid: data
.parameters
.iter()
.filter(|p| p.name == "requestID")
.map(|p| p.clone().value)
.next(),
sessionid: data
.parameters
.iter()
.filter(|p| p.name == "sessionID")
.map(|p| p.clone().value)
.next(),
calcTime: Some(
f64::from_str(
&data
.parameters
.iter()
.filter(|p| p.name == "calcTime")
.map(|p| p.clone().value)
.next()
.unwrap(),
)
.unwrap(),
),
serverTime: data
.parameters
.iter()
.filter(|p| p.name == "serverTime")
.map(|p| p.clone().value)
.next()
.unwrap()
.parse::<NaiveDateTime>()
.ok(),
logRequestId: data
.parameters
.iter()
.filter(|p| p.name == "logRequestId")
.map(|p| p.clone().value)
.next(),
})
.returning(DepatureRequest::as_returning())
.on_conflict_do_nothing()
.get_result(&mut conn)
.optional()
.unwrap() {
let _ = insert_into(stop_lists)
.values(StopList {
input: i32::from_str(&data.dm.input.input).unwrap(),
point_dm: Some(data.dm.points.point.usage),
point_type: Some(data.dm.points.point.type_field.clone()),
point_name: Some(data.dm.points.point.name),
point_stopId: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
point_stateless: Some(data.dm.points.point.stateless),
point_anytype: Some(data.dm.points.point.any_type),
point_sort: i32::from_str(&data.dm.points.point.sort).ok(),
point_quality: i32::from_str(&data.dm.points.point.quality).ok(),
point_best: i32::from_str(&data.dm.points.point.best).ok(),
point_object: Some(data.dm.points.point.object),
point_ref_id: i32::from_str(&data.dm.points.point.ref_field.id).ok(),
point_ref_gid: Some(data.dm.points.point.ref_field.gid),
point_ref_omc: i32::from_str(&data.dm.points.point.ref_field.omc).ok(),
point_ref_placeID: i32::from_str(&data.dm.points.point.ref_field.place_id).ok(),
point_ref_place: Some(data.dm.points.point.ref_field.place),
point_ref_coords: Some(data.dm.points.point.ref_field.coords),
itdOdvAssignedStops_stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id)
.ok(),
itdOdvAssignedStops_name: Some(data.dm.itd_odv_assigned_stops.name),
itdOdvAssignedStops_x: f64::from_str(&data.dm.itd_odv_assigned_stops.x).ok(),
itdOdvAssignedStops_y: f64::from_str(&data.dm.itd_odv_assigned_stops.y).ok(),
itdOdvAssignedStops_mapName: Some(data.dm.itd_odv_assigned_stops.map_name),
itdOdvAssignedStops_value: Some(data.dm.itd_odv_assigned_stops.value),
itdOdvAssignedStops_place: Some(data.dm.itd_odv_assigned_stops.place),
itdOdvAssignedStops_nameWithPlace: Some(
data.dm.itd_odv_assigned_stops.name_with_place,
),
itdOdvAssignedStops_distanceTime: i32::from_str(
&data.dm.itd_odv_assigned_stops.distance_time,
)
.ok(),
itdOdvAssignedStops_isTransferStop: i32::from_str(
&data.dm.itd_odv_assigned_stops.is_transfer_stop,
)
.ok(),
itdOdvAssignedStops_vm: i32::from_str(&data.dm.itd_odv_assigned_stops.vm).ok(),
itdOdvAssignedStops_gid: Some(data.dm.itd_odv_assigned_stops.gid),
meta_lastModificationDate: Some(Local::now().naive_local()),
})
.returning(StopList::as_returning())
.on_conflict_do_nothing()
.get_result(&mut conn)
.optional();
for line in data.serving_lines.lines {
let id = hex::encode(Sha256::digest(
serde_json::to_string(&line.mode).unwrap().into_bytes(),
));
let _ = insert_into(serving_lines)
.values(ServingLine {
servingLineId: id.clone(),
mode_name: Some(line.mode.name),
mode_number: Some(line.mode.number),
mode_product: Some(line.mode.product),
mode_productId: i32::from_str(&line.mode.product_id).ok(),
mode_type: i32::from_str(&line.mode.type_field).ok(),
mode_code: i32::from_str(&line.mode.code).ok(),
mode_destination: Some(line.mode.destination),
mode_destID: i32::from_str(&line.mode.dest_id).ok(),
mode_desc: Some(line.mode.desc),
mode_timetablePeriod: Some(line.mode.timetable_period),
diva_branch: Some(line.mode.diva.branch),
diva_line: Some(line.mode.diva.line),
diva_supplement: Some(line.mode.diva.supplement),
diva_dir: Some(line.mode.diva.dir),
diva_project: Some(line.mode.diva.project),
diva_network: Some(line.mode.diva.network),
diva_stateless: Some(line.mode.diva.stateless),
diva_tripCode: i32::from_str(&line.mode.diva.trip_code).ok(),
diva_operator: Some(line.mode.diva.operator),
diva_opPublicCode: line.mode.diva.op_public_code,
diva_opCode: Some(line.mode.diva.op_code),
diva_Vf: NaiveDate::parse_from_str(&line.mode.diva.v_f, "%Y%m%d").ok(),
diva_vTo: NaiveDate::parse_from_str(&line.mode.diva.v_to, "%Y%m%d").ok(),
diva_lineDisplay: Some(line.mode.diva.line_display),
meta_lastModificationDate: req.serverTime,
})
.returning(ServingLine::as_returning())
.on_conflict_do_nothing()
.get_result(&mut conn)
.optional()
.unwrap();
let _ = insert_into(serving_line_depature_requests)
.values(ServingLineDepatureRequest {
depatureRequestId: req.depatureRequestId,
servingLineId: id.clone(),
})
.returning(ServingLineDepatureRequest::as_returning())
.on_conflict_do_nothing()
.get_result(&mut conn)
.optional()
.unwrap();
}
if let Some(infos) = data.dm.points.point.infos {
let _ = insert_into(station_hints).values(StationHint {
infoLinkURL: infos.info.info_link_url,
infoLinkText: Some(infos.info.info_link_text),
infoText: Some(infos.info.info_text.content),
});
}
{
let _ = insert_into(arrivals)
.values(&arrival_list.into_iter().map(|arrival| { Arrival {
depatureRequestId: req.depatureRequestId,
stopID: i32::from_str(&arrival.stop_id).ok(),
x: f64::from_str(&arrival.x).ok(),
y: f64::from_str(&arrival.y).ok(),
mapName: Some(arrival.map_name),
area: Some(arrival.area),
platform: Some(arrival.platform),
platformName: Some(arrival.platform_name),
stopName: Some(arrival.stop_name),
nameWO: Some(arrival.name_wo),
pointType: Some(data.dm.points.point.type_field.clone()),
countdown: i32::from_str(&arrival.countdown).ok(),
arrivalTime: NaiveDate::from_ymd_opt(
i32::from_str(&arrival.date_time.year).unwrap(),
u32::from_str(&arrival.date_time.month).unwrap(),
u32::from_str(&arrival.date_time.day).unwrap(),
)
.unwrap()
.and_hms_opt(
u32::from_str(&arrival.date_time.hour).unwrap(),
u32::from_str(&arrival.date_time.minute).unwrap(),
0,
),
realArrivalTime: if let Some(date_time) = arrival.real_date_time {
NaiveDate::from_ymd_opt(
i32::from_str(&date_time.year).unwrap(),
u32::from_str(&date_time.month).unwrap(),
u32::from_str(&date_time.day).unwrap(),
)
.unwrap()
.and_hms_opt(
u32::from_str(&date_time.hour).unwrap(),
u32::from_str(&date_time.minute).unwrap(),
0,
)
} else {
None
},
servingLine_key: Some(arrival.serving_line.key),
servingLine_code: Some(arrival.serving_line.code),
servingLine_number: Some(arrival.serving_line.number),
servingLine_symbol: Some(arrival.serving_line.symbol),
servingLine_motType: i32::from_str(&arrival.serving_line.mot_type).ok(),
servingLine_mtSubcode: i32::from_str(&arrival.serving_line.mt_subcode).ok(),
servingLine_realtime: bool::from_str(&arrival.serving_line.realtime).ok(),
servingLine_direction: Some(arrival.serving_line.direction),
servingLine_directionFrom: Some(arrival.serving_line.direction_from),
servingLine_name: Some(arrival.serving_line.name),
servingLine_delay: if let Some(delay) = arrival.serving_line.delay {
i32::from_str(&delay).ok()
} else {
None
},
servingLine_liErgRiProj_line: Some(arrival.serving_line.li_erg_ri_proj.line),
servingLine_liErgRiProj_project: Some(
arrival.serving_line.li_erg_ri_proj.project,
),
servingLine_liErgRiProj_direction: Some(
arrival.serving_line.li_erg_ri_proj.direction,
),
servingLine_liErgRiProj_supplement: Some(
arrival.serving_line.li_erg_ri_proj.supplement,
),
servingLine_liErgRiProj_network: Some(
arrival.serving_line.li_erg_ri_proj.network,
),
servingLine_destID: Some(arrival.serving_line.dest_id),
servingLine_stateless: Some(arrival.serving_line.stateless),
servingLine_lineDisplay: Some(arrival.serving_line.line_display),
operator_code: arrival.operator.clone().map(|op| op.code),
operator_name: arrival.operator.clone().map(|op| op.name),
operator_publicCode: arrival.operator.map(|op| op.public_code),
attrs_name: None,
attrs_value: None,
}}).collect::<Vec<_>>())
.returning(Arrival::as_returning())
.on_conflict_do_nothing()
.get_results(&mut conn)
.optional();
}
}
});
println!("");
Ok(())
}