Compare commits

...

2 commits

20 changed files with 200 additions and 1223 deletions

6
.dockerignore Normal file
View file

@ -0,0 +1,6 @@
target
.gitignore
README.md
Dockerfile
docker-compose.yml
migrations

View file

@ -0,0 +1,23 @@
name: Willdaten fetching pipeline
on:
schedule:
- cron: "0 * * * *" # run hourly
jobs:
process_data:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Rust (if needed)
uses: dtolnay/rust-toolchain@stable
- name: Run the program
run: cargo run --release --bin datafetcher
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
REQ: ${{ vars.WILLDATEN_REQ }}
VRN_TOKEN: ${{ secrets.WILLDATEN_VRN_TOKEN }}
STATIONS: ${{ vars.WILLDATEN_STATIONS }}

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
migrations

1128
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,2 +1,3 @@
[workspace] [workspace]
members = [ "datafetcher","dataworker"] members = [ "datafetcher","dataworker"]
resolver = "2"

8
Dockerfile Normal file
View file

@ -0,0 +1,8 @@
FROM rust:latest
LABEL authors="thebread <contact@thebread.dev>"
WORKDIR /app
COPY . .
RUN cargo build
CMD ["cargo", "run", "--bin", "datafetcher"]

1
datafetcher/.env Symbolic link
View file

@ -0,0 +1 @@
/home/bread/Dokumente/willdaten/.env

View file

@ -5,10 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
chrono = "0.4.39" chrono = "0.4.39"
maud = { version = "0.26.0", features = ["rocket"] } reqwest = { version = "0.12.12", features = ["json"] }
reqwest = "0.12.12"
rocket = "0.5.1"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] } tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] }
dataworker = { path = "../dataworker" } dataworker = { path = "../dataworker" }
serde = { version = "1.0.217", features = ["derive"] } dotenvy = "0.15.7"
serde_json = "1.0.137"

View file

@ -1,5 +0,0 @@
use std::time::Duration;
fn main() {
let time_window = Duration::from_secs(3600);
}

View file

@ -1,100 +1,54 @@
#[macro_use] use chrono::Local;
extern crate rocket; use dataworker::diesel::{Connection, PgConnection};
use maud::{html, Markup, PreEscaped, DOCTYPE}; use dataworker::dtos::Root;
use rocket::response::stream::{Event, EventStream}; use dataworker::{insert_document, WilldatenError};
use rocket::tokio::time::{interval, Duration}; use dotenvy::dotenv;
use std::env;
use std::io::Write;
struct SsePayload { #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
let stations: Vec<_> = env::var("STATIONS")?
.split(',')
.map(|s| s.to_string())
.collect();
let token = env::var("VRN_TOKEN")?;
let req = env::var("REQ")?;
let db_url = env::var("DATABASE_URL")?;
let mut conn = PgConnection::establish(&db_url)?;
} let total = stations.len();
let mut count = 0;
#[get("/sse")] let client = reqwest::Client::new();
async fn sse() -> EventStream![] {
EventStream! {
let mut interval = interval(Duration::from_secs(2));
loop {
interval.tick().await;
yield Event::data(format!("Updated time: {:?}", chrono::Utc::now()));
}
}
}
#[get("/")] for station in stations {
fn index() -> Markup { count += 1;
html! { let date = Local::now();
(DOCTYPE) let req = req
html lang="en" { .replace("TEMPLATE_ID", &station)
head { .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string())
meta charset="utf-8"; .replace("TEMPLATE_DATE", &date.format("%H%M").to_string());
title { "Live Updates with SSE" } print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station);
style { std::io::stdout().flush().unwrap();
"body { if let Ok(doc) = client
color: white; .get(req)
display: flex; .header("Authorization", &token)
flex-direction: column; .send()
align-items: center; .await?
justify-content: center; .json::<Root>()
height: 100vh; .await {
background: rgb(34,187,195); let place_name = doc.dm.itd_odv_assigned_stops.name_with_place.clone();
background: linear-gradient(0deg, rgba(34,187,195,1) 0%, rgba(102,45,253,1) 100%); match insert_document(doc,
font-family: sans-serif; &mut conn,
} ) {
.glassy { Ok(_) => {}
background: rgba(255, 255, 255, 0.2); Err(ref e) if e.downcast_ref::<WilldatenError>().is_some() => println!("\x1B[2K\rMissing arrivals for {} ({})!", station, place_name),
border-radius: 16px; Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(2.8px);
-webkit-backdrop-filter: blur(2.8px);
border: 1px solid rgba(255, 255, 255, 0.3);
}
.data-grid {
margin: 2em;
padding: 2em;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
border-radius: 18px;
}
.data-cell {
margin: 0.5em 1em;
padding: 1em;
border-radius: 18px;
display: flex;
flex-direction: column;
width: 100%;
}
"
}
script {
(PreEscaped("const eventSource = new EventSource('/sse');
eventSource.onmessage = (event) => {
document.getElementById('updates').innerText = event.data;
};"))
}
} }
body style="" { } else { println!("\x1B[2K\rFailed to fetch station {}", station); }
h1 style="align-items: center;" { "willdaten dashboard" }
div class="glassy data-grid" {
div class="glassy data-cell" {
h3 { "Fetcher" }
div id="updates" { "Waiting for updates..." }
}
div class="glassy data-cell" {
h3 { "Database" }
div id="updates" { "Waiting for updates..." }
}
div class="glassy data-cell" {
h3 { "Statistics" }
div id="updates" { "Waiting for updates..." }
}
}
}
}
} }
} println!();
Ok(())
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index, sse])
} }

1
dataworker/.env Symbolic link
View file

@ -0,0 +1 @@
/home/bread/Dokumente/willdaten/.env

View file

@ -5,13 +5,10 @@ edition = "2021"
[dependencies] [dependencies]
chrono = "0.4.39" chrono = "0.4.39"
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend"] } diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] }
dotenvy = "0.15.7" dotenvy = "0.15.7"
hex = "0.4.3"
maud = "0.26.0"
reqwest = { version = "0.12.12", features = ["json"] }
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135" serde_json = "1.0.135"
sha2 = "0.10.8" thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] } tokio = { version = "1.43.0", features = ["full"] }
uuid = "1.12.0" uuid = { version = "1.12.1", features = ["v5"] }

View file

@ -32,7 +32,7 @@ CREATE TABLE "stop_lists" (
); );
CREATE TABLE "serving_lines" ( CREATE TABLE "serving_lines" (
"servingLineId" varchar(255) PRIMARY KEY, "servingLineId" uuid PRIMARY KEY,
"mode_name" varchar(255), "mode_name" varchar(255),
"mode_number" varchar(50), "mode_number" varchar(50),
"mode_product" varchar(255), "mode_product" varchar(255),
@ -61,7 +61,7 @@ CREATE TABLE "serving_lines" (
); );
CREATE TABLE "arrivals" ( CREATE TABLE "arrivals" (
"depatureRequestId" serial, "depatureRequestId" uuid,
"stopID" int, "stopID" int,
"x" float, "x" float,
"y" float, "y" float,
@ -103,15 +103,15 @@ CREATE TABLE "arrivals" (
); );
CREATE TABLE "depature_requests" ( CREATE TABLE "depature_requests" (
"depatureRequestId" serial PRIMARY KEY, "depatureRequestId" uuid PRIMARY KEY,
"stopid" int unique, "stopid" int,
"serverid" varchar(255), "serverid" varchar(255),
"requestid" varchar(255), "requestid" varchar(255),
"sessionid" varchar(255), "sessionid" varchar(255),
"calcTime" float, "calcTime" float,
"serverTime" timestamp, "serverTime" timestamp,
"logRequestId" varchar(255), "logRequestId" varchar(255),
unique ("depatureRequestId", "stopid") unique ("depatureRequestId")
); );
CREATE TABLE "station_hints" ( CREATE TABLE "station_hints" (
@ -122,18 +122,18 @@ CREATE TABLE "station_hints" (
CREATE TABLE "depature_requeststation_hints" ( CREATE TABLE "depature_requeststation_hints" (
"infoLinkUrl" varchar(255) PRIMARY KEY, "infoLinkUrl" varchar(255) PRIMARY KEY,
"depatureRequestId" int "depatureRequestId" uuid
); );
CREATE TABLE "serving_line_depature_requests" ( CREATE TABLE "serving_line_depature_requests" (
"depatureRequestId" int PRIMARY KEY, "depatureRequestId" uuid PRIMARY KEY,
"servingLineId" varchar(255) "servingLineId" uuid
); );
ALTER TABLE "arrivals" ADD FOREIGN KEY ("depatureRequestId") REFERENCES "depature_requests" ("depatureRequestId"); ALTER TABLE "arrivals" ADD FOREIGN KEY ("depatureRequestId") REFERENCES "depature_requests" ("depatureRequestId");
--ALTER TABLE "depature_requests" ADD FOREIGN KEY ("stopid") REFERENCES "stop_lists" ("input"); --ALTER TABLE "depature_requests" ADD FOREIGN KEY ("stopid") REFERENCES "stop_lists" ("input");
ALTER TABLE "stop_lists" ADD FOREIGN KEY ("input") REFERENCES "depature_requests" ("stopid"); --ALTER TABLE "stop_lists" ADD FOREIGN KEY ("input") REFERENCES "depature_requests" ("stopid");
ALTER TABLE "depature_requeststation_hints" ADD FOREIGN KEY ("infoLinkUrl") REFERENCES "station_hints" ("infoLinkURL"); ALTER TABLE "depature_requeststation_hints" ADD FOREIGN KEY ("infoLinkUrl") REFERENCES "station_hints" ("infoLinkURL");

View file

@ -6,4 +6,4 @@ file = "src/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]
[migrations_directory] [migrations_directory]
dir = "/home/bread/Dokumente/willdaten/migrations" dir = "migrations"

View file

@ -279,7 +279,7 @@ pub struct Attr {
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct DepartureList { pub struct DepartureList {
#[serde(rename = "stopID")] #[serde(rename = "stopID")]
pub stop_id: i32, pub stop_id: String,
pub x: String, pub x: String,
pub y: String, pub y: String,
pub map_name: String, pub map_name: String,

View file

@ -1,19 +1,16 @@
#[macro_use] #[macro_use]
extern crate diesel; pub extern crate diesel;
pub mod dtos; pub mod dtos;
pub use diesel::{OptionalExtension, dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
mod models; mod models;
mod schema; mod schema;
use chrono::NaiveDate; use chrono::NaiveDate;
use chrono::{Local, NaiveDateTime}; use chrono::{Local, NaiveDateTime};
use diesel::OptionalExtension;
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
use dtos::Root; use dtos::Root;
use hex;
use models::{ use models::{
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest, Arrival, DepatureRequest, ServingLine, ServingLineDepatureRequest, StationHint, StopList,
StationHint, StopList,
}; };
use schema::arrivals::dsl::arrivals; use schema::arrivals::dsl::arrivals;
use schema::depature_requests::dsl::depature_requests; use schema::depature_requests::dsl::depature_requests;
@ -22,14 +19,25 @@ use schema::serving_lines::dsl::serving_lines;
use schema::station_hints::dsl::station_hints; use schema::station_hints::dsl::station_hints;
use schema::stop_lists::dsl::stop_lists; use schema::stop_lists::dsl::stop_lists;
use serde_json; use serde_json;
use sha2::Digest; use std::{error::Error, str::FromStr};
use sha2::Sha256; use uuid::Uuid;
use std::{env, error::Error, fs, io::Write, str::FromStr}; use thiserror::Error;
#[derive(Debug, Error)]
pub enum WilldatenError {
#[error("Missing arrivals in JSON!")]
MissingArrivals,
}
pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> { pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> {
let arrival_list = data.arrival_list.unwrap(); let Some(arrival_list) = data.arrival_list else { return Err(Box::new(WilldatenError::MissingArrivals)) };
if let Some(req) = insert_into(depature_requests) if let Some(req) = insert_into(depature_requests)
.values(NewDepatureRequest { .values(DepatureRequest {
depatureRequestId: Uuid::new_v5(
&Uuid::NAMESPACE_DNS,
serde_json::to_string(&data.parameters)?.as_bytes(),
),
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(), stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
serverid: data serverid: data
.parameters .parameters
@ -130,9 +138,10 @@ pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dy
.get_result(conn) .get_result(conn)
.optional(); .optional();
for line in data.serving_lines.lines { for line in data.serving_lines.lines {
let id = hex::encode(Sha256::digest( let id = Uuid::new_v5(
serde_json::to_string(&line.mode).unwrap().into_bytes(), &Uuid::NAMESPACE_DNS,
)); serde_json::to_string(&line)?.as_bytes(),
);
let _ = insert_into(serving_lines) let _ = insert_into(serving_lines)
.values(ServingLine { .values(ServingLine {
servingLineId: id.clone(), servingLineId: id.clone(),

View file

@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
dotenv().ok(); dotenv().ok();
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = let mut conn =
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url)); PgConnection::establish(&url).unwrap_or_else(|e| panic!("Error connecting to {}: {}", url, e));
let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22"; let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22";
let total = fs::read_dir(src).unwrap().count(); let total = fs::read_dir(src).unwrap().count();

View file

@ -8,12 +8,14 @@ use crate::schema::*;
use chrono::NaiveDate; use chrono::NaiveDate;
use chrono::NaiveDateTime; use chrono::NaiveDateTime;
use diesel::pg::Pg; use diesel::pg::Pg;
use uuid::Uuid;
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)] #[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
#[diesel(table_name = arrivals)] #[diesel(table_name = arrivals)]
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
#[diesel(primary_key(stopID, servingLine_key, servingLine_stateless))] #[diesel(primary_key(stopID, servingLine_key, servingLine_stateless))]
pub struct Arrival { pub struct Arrival {
pub depatureRequestId: i32, pub depatureRequestId: Uuid,
pub stopID: Option<i32>, pub stopID: Option<i32>,
pub x: Option<f64>, pub x: Option<f64>,
pub y: Option<f64>, pub y: Option<f64>,
@ -58,7 +60,7 @@ pub struct Arrival {
#[diesel(check_for_backend(Pg))] #[diesel(check_for_backend(Pg))]
#[diesel(primary_key(depatureRequestId))] #[diesel(primary_key(depatureRequestId))]
pub struct DepatureRequest { pub struct DepatureRequest {
pub depatureRequestId: i32, pub depatureRequestId: Uuid,
pub stopid: Option<i32>, pub stopid: Option<i32>,
pub serverid: Option<String>, pub serverid: Option<String>,
pub requestid: Option<String>, pub requestid: Option<String>,
@ -86,22 +88,22 @@ pub struct NewDepatureRequest {
#[diesel(primary_key(infoLinkUrl))] #[diesel(primary_key(infoLinkUrl))]
pub struct DepatureRequeststationHint { pub struct DepatureRequeststationHint {
pub infoLinkUrl: String, pub infoLinkUrl: String,
pub depatureRequestId: Option<i32>, pub depatureRequestId: Option<Uuid>,
} }
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)] #[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
#[diesel(table_name = serving_line_depature_requests)] #[diesel(table_name = serving_line_depature_requests)]
#[diesel(primary_key(depatureRequestId))] #[diesel(primary_key(depatureRequestId))]
pub struct ServingLineDepatureRequest { pub struct ServingLineDepatureRequest {
pub depatureRequestId: i32, pub depatureRequestId: Uuid,
pub servingLineId: String, pub servingLineId: Uuid,
} }
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)] #[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
#[diesel(table_name = serving_lines)] #[diesel(table_name = serving_lines)]
#[diesel(primary_key(servingLineId))] #[diesel(primary_key(servingLineId))]
pub struct ServingLine { pub struct ServingLine {
pub servingLineId: String, pub servingLineId: Uuid,
pub mode_name: Option<String>, pub mode_name: Option<String>,
pub mode_number: Option<String>, pub mode_number: Option<String>,
pub mode_product: Option<String>, pub mode_product: Option<String>,

View file

@ -2,7 +2,7 @@
diesel::table! { diesel::table! {
arrivals (stopID, servingLine_key, servingLine_stateless) { arrivals (stopID, servingLine_key, servingLine_stateless) {
depatureRequestId -> Int4, depatureRequestId -> Uuid,
stopID -> Nullable<Int4>, stopID -> Nullable<Int4>,
x -> Nullable<Float8>, x -> Nullable<Float8>,
y -> Nullable<Float8>, y -> Nullable<Float8>,
@ -72,7 +72,7 @@ diesel::table! {
diesel::table! { diesel::table! {
depature_requests (depatureRequestId) { depature_requests (depatureRequestId) {
depatureRequestId -> Int4, depatureRequestId -> Uuid,
stopid -> Nullable<Int4>, stopid -> Nullable<Int4>,
#[max_length = 255] #[max_length = 255]
serverid -> Nullable<Varchar>, serverid -> Nullable<Varchar>,
@ -91,22 +91,20 @@ diesel::table! {
depature_requeststation_hints (infoLinkUrl) { depature_requeststation_hints (infoLinkUrl) {
#[max_length = 255] #[max_length = 255]
infoLinkUrl -> Varchar, infoLinkUrl -> Varchar,
depatureRequestId -> Nullable<Int4>, depatureRequestId -> Nullable<Uuid>,
} }
} }
diesel::table! { diesel::table! {
serving_line_depature_requests (depatureRequestId) { serving_line_depature_requests (depatureRequestId) {
depatureRequestId -> Int4, depatureRequestId -> Uuid,
#[max_length = 255] servingLineId -> Uuid,
servingLineId -> VarChar,
} }
} }
diesel::table! { diesel::table! {
serving_lines (servingLineId) { serving_lines (servingLineId) {
#[max_length = 255] servingLineId -> Uuid,
servingLineId -> VarChar,
#[max_length = 255] #[max_length = 255]
mode_name -> Nullable<Varchar>, mode_name -> Nullable<Varchar>,
#[max_length = 50] #[max_length = 50]

View file

@ -1,10 +1,10 @@
services: services:
db: db:
image: postgres image: postgres
restart: always restart: always
environment: environment:
POSTGRES_HOST_AUTH_METHOD: trust POSTGRES_HOST_AUTH_METHOD: trust
POSTGRES_USER: postgres POSTGRES_USER: bread
POSTGRES_DB: bread
ports: ports:
- 5432:5432 - 5432:5432