Compare commits
2 commits
44d5a3b57a
...
474670b0af
Author | SHA1 | Date | |
---|---|---|---|
474670b0af | |||
5e681ee1c8 |
20 changed files with 200 additions and 1223 deletions
6
.dockerignore
Normal file
6
.dockerignore
Normal file
|
@ -0,0 +1,6 @@
|
|||
target
|
||||
.gitignore
|
||||
README.md
|
||||
Dockerfile
|
||||
docker-compose.yml
|
||||
migrations
|
23
.forgejo/workflows/pipeline.yml
Normal file
23
.forgejo/workflows/pipeline.yml
Normal file
|
@ -0,0 +1,23 @@
|
|||
name: Willdaten fetching pipeline
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "0 * * * *" # run hourly
|
||||
|
||||
jobs:
|
||||
process_data:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Rust (if needed)
|
||||
uses: dtolnay/rust-toolchain@stable
|
||||
|
||||
- name: Run the program
|
||||
run: cargo run --release --bin datafetcher
|
||||
env:
|
||||
DATABASE_URL: ${{ secrets.DATABASE_URL }}
|
||||
REQ: ${{ vars.WILLDATEN_REQ }}
|
||||
VRN_TOKEN: ${{ secrets.WILLDATEN_VRN_TOKEN }}
|
||||
STATIONS: ${{ vars.WILLDATEN_STATIONS }}
|
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1 +1,2 @@
|
|||
/target
|
||||
migrations
|
1128
Cargo.lock
generated
1128
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -1,2 +1,3 @@
|
|||
[workspace]
|
||||
members = [ "datafetcher","dataworker"]
|
||||
resolver = "2"
|
8
Dockerfile
Normal file
8
Dockerfile
Normal file
|
@ -0,0 +1,8 @@
|
|||
FROM rust:latest
|
||||
LABEL authors="thebread <contact@thebread.dev>"
|
||||
|
||||
WORKDIR /app
|
||||
COPY . .
|
||||
RUN cargo build
|
||||
|
||||
CMD ["cargo", "run", "--bin", "datafetcher"]
|
1
datafetcher/.env
Symbolic link
1
datafetcher/.env
Symbolic link
|
@ -0,0 +1 @@
|
|||
/home/bread/Dokumente/willdaten/.env
|
|
@ -5,10 +5,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
chrono = "0.4.39"
|
||||
maud = { version = "0.26.0", features = ["rocket"] }
|
||||
reqwest = "0.12.12"
|
||||
rocket = "0.5.1"
|
||||
reqwest = { version = "0.12.12", features = ["json"] }
|
||||
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] }
|
||||
dataworker = { path = "../dataworker" }
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.137"
|
||||
dotenvy = "0.15.7"
|
|
@ -1,5 +0,0 @@
|
|||
use std::time::Duration;
|
||||
|
||||
fn main() {
|
||||
let time_window = Duration::from_secs(3600);
|
||||
}
|
|
@ -1,100 +1,54 @@
|
|||
#[macro_use]
|
||||
extern crate rocket;
|
||||
use maud::{html, Markup, PreEscaped, DOCTYPE};
|
||||
use rocket::response::stream::{Event, EventStream};
|
||||
use rocket::tokio::time::{interval, Duration};
|
||||
use chrono::Local;
|
||||
use dataworker::diesel::{Connection, PgConnection};
|
||||
use dataworker::dtos::Root;
|
||||
use dataworker::{insert_document, WilldatenError};
|
||||
use dotenvy::dotenv;
|
||||
use std::env;
|
||||
use std::io::Write;
|
||||
|
||||
struct SsePayload {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv().ok();
|
||||
let stations: Vec<_> = env::var("STATIONS")?
|
||||
.split(',')
|
||||
.map(|s| s.to_string())
|
||||
.collect();
|
||||
let token = env::var("VRN_TOKEN")?;
|
||||
let req = env::var("REQ")?;
|
||||
let db_url = env::var("DATABASE_URL")?;
|
||||
let mut conn = PgConnection::establish(&db_url)?;
|
||||
|
||||
}
|
||||
let total = stations.len();
|
||||
let mut count = 0;
|
||||
|
||||
#[get("/sse")]
|
||||
async fn sse() -> EventStream![] {
|
||||
EventStream! {
|
||||
let mut interval = interval(Duration::from_secs(2));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
yield Event::data(format!("Updated time: {:?}", chrono::Utc::now()));
|
||||
}
|
||||
}
|
||||
}
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
#[get("/")]
|
||||
fn index() -> Markup {
|
||||
html! {
|
||||
(DOCTYPE)
|
||||
html lang="en" {
|
||||
head {
|
||||
meta charset="utf-8";
|
||||
title { "Live Updates with SSE" }
|
||||
style {
|
||||
"body {
|
||||
color: white;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
height: 100vh;
|
||||
background: rgb(34,187,195);
|
||||
background: linear-gradient(0deg, rgba(34,187,195,1) 0%, rgba(102,45,253,1) 100%);
|
||||
font-family: sans-serif;
|
||||
}
|
||||
.glassy {
|
||||
background: rgba(255, 255, 255, 0.2);
|
||||
border-radius: 16px;
|
||||
box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1);
|
||||
backdrop-filter: blur(2.8px);
|
||||
-webkit-backdrop-filter: blur(2.8px);
|
||||
border: 1px solid rgba(255, 255, 255, 0.3);
|
||||
}
|
||||
.data-grid {
|
||||
margin: 2em;
|
||||
padding: 2em;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
border-radius: 18px;
|
||||
}
|
||||
.data-cell {
|
||||
margin: 0.5em 1em;
|
||||
padding: 1em;
|
||||
border-radius: 18px;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
width: 100%;
|
||||
}
|
||||
"
|
||||
}
|
||||
script {
|
||||
(PreEscaped("const eventSource = new EventSource('/sse');
|
||||
eventSource.onmessage = (event) => {
|
||||
document.getElementById('updates').innerText = event.data;
|
||||
};"))
|
||||
}
|
||||
for station in stations {
|
||||
count += 1;
|
||||
let date = Local::now();
|
||||
let req = req
|
||||
.replace("TEMPLATE_ID", &station)
|
||||
.replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string())
|
||||
.replace("TEMPLATE_DATE", &date.format("%H%M").to_string());
|
||||
print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station);
|
||||
std::io::stdout().flush().unwrap();
|
||||
if let Ok(doc) = client
|
||||
.get(req)
|
||||
.header("Authorization", &token)
|
||||
.send()
|
||||
.await?
|
||||
.json::<Root>()
|
||||
.await {
|
||||
let place_name = doc.dm.itd_odv_assigned_stops.name_with_place.clone();
|
||||
match insert_document(doc,
|
||||
&mut conn,
|
||||
) {
|
||||
Ok(_) => {}
|
||||
Err(ref e) if e.downcast_ref::<WilldatenError>().is_some() => println!("\x1B[2K\rMissing arrivals for {} ({})!", station, place_name),
|
||||
Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
|
||||
}
|
||||
body style="" {
|
||||
h1 style="align-items: center;" { "willdaten dashboard" }
|
||||
div class="glassy data-grid" {
|
||||
div class="glassy data-cell" {
|
||||
h3 { "Fetcher" }
|
||||
div id="updates" { "Waiting for updates..." }
|
||||
}
|
||||
div class="glassy data-cell" {
|
||||
h3 { "Database" }
|
||||
div id="updates" { "Waiting for updates..." }
|
||||
}
|
||||
div class="glassy data-cell" {
|
||||
h3 { "Statistics" }
|
||||
div id="updates" { "Waiting for updates..." }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else { println!("\x1B[2K\rFailed to fetch station {}", station); }
|
||||
}
|
||||
}
|
||||
|
||||
#[launch]
|
||||
fn rocket() -> _ {
|
||||
rocket::build().mount("/", routes![index, sse])
|
||||
println!();
|
||||
Ok(())
|
||||
}
|
||||
|
|
1
dataworker/.env
Symbolic link
1
dataworker/.env
Symbolic link
|
@ -0,0 +1 @@
|
|||
/home/bread/Dokumente/willdaten/.env
|
|
@ -5,13 +5,10 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
chrono = "0.4.39"
|
||||
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend"] }
|
||||
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] }
|
||||
dotenvy = "0.15.7"
|
||||
hex = "0.4.3"
|
||||
maud = "0.26.0"
|
||||
reqwest = { version = "0.12.12", features = ["json"] }
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = "1.0.135"
|
||||
sha2 = "0.10.8"
|
||||
thiserror = "2.0.11"
|
||||
tokio = { version = "1.43.0", features = ["full"] }
|
||||
uuid = "1.12.0"
|
||||
uuid = { version = "1.12.1", features = ["v5"] }
|
||||
|
|
|
@ -32,7 +32,7 @@ CREATE TABLE "stop_lists" (
|
|||
);
|
||||
|
||||
CREATE TABLE "serving_lines" (
|
||||
"servingLineId" varchar(255) PRIMARY KEY,
|
||||
"servingLineId" uuid PRIMARY KEY,
|
||||
"mode_name" varchar(255),
|
||||
"mode_number" varchar(50),
|
||||
"mode_product" varchar(255),
|
||||
|
@ -61,7 +61,7 @@ CREATE TABLE "serving_lines" (
|
|||
);
|
||||
|
||||
CREATE TABLE "arrivals" (
|
||||
"depatureRequestId" serial,
|
||||
"depatureRequestId" uuid,
|
||||
"stopID" int,
|
||||
"x" float,
|
||||
"y" float,
|
||||
|
@ -103,15 +103,15 @@ CREATE TABLE "arrivals" (
|
|||
);
|
||||
|
||||
CREATE TABLE "depature_requests" (
|
||||
"depatureRequestId" serial PRIMARY KEY,
|
||||
"stopid" int unique,
|
||||
"depatureRequestId" uuid PRIMARY KEY,
|
||||
"stopid" int,
|
||||
"serverid" varchar(255),
|
||||
"requestid" varchar(255),
|
||||
"sessionid" varchar(255),
|
||||
"calcTime" float,
|
||||
"serverTime" timestamp,
|
||||
"logRequestId" varchar(255),
|
||||
unique ("depatureRequestId", "stopid")
|
||||
unique ("depatureRequestId")
|
||||
);
|
||||
|
||||
CREATE TABLE "station_hints" (
|
||||
|
@ -122,18 +122,18 @@ CREATE TABLE "station_hints" (
|
|||
|
||||
CREATE TABLE "depature_requeststation_hints" (
|
||||
"infoLinkUrl" varchar(255) PRIMARY KEY,
|
||||
"depatureRequestId" int
|
||||
"depatureRequestId" uuid
|
||||
);
|
||||
|
||||
CREATE TABLE "serving_line_depature_requests" (
|
||||
"depatureRequestId" int PRIMARY KEY,
|
||||
"servingLineId" varchar(255)
|
||||
"depatureRequestId" uuid PRIMARY KEY,
|
||||
"servingLineId" uuid
|
||||
);
|
||||
|
||||
ALTER TABLE "arrivals" ADD FOREIGN KEY ("depatureRequestId") REFERENCES "depature_requests" ("depatureRequestId");
|
||||
|
||||
--ALTER TABLE "depature_requests" ADD FOREIGN KEY ("stopid") REFERENCES "stop_lists" ("input");
|
||||
ALTER TABLE "stop_lists" ADD FOREIGN KEY ("input") REFERENCES "depature_requests" ("stopid");
|
||||
--ALTER TABLE "stop_lists" ADD FOREIGN KEY ("input") REFERENCES "depature_requests" ("stopid");
|
||||
|
||||
ALTER TABLE "depature_requeststation_hints" ADD FOREIGN KEY ("infoLinkUrl") REFERENCES "station_hints" ("infoLinkURL");
|
||||
|
||||
|
|
|
@ -6,4 +6,4 @@ file = "src/schema.rs"
|
|||
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]
|
||||
|
||||
[migrations_directory]
|
||||
dir = "/home/bread/Dokumente/willdaten/migrations"
|
||||
dir = "migrations"
|
||||
|
|
|
@ -279,7 +279,7 @@ pub struct Attr {
|
|||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DepartureList {
|
||||
#[serde(rename = "stopID")]
|
||||
pub stop_id: i32,
|
||||
pub stop_id: String,
|
||||
pub x: String,
|
||||
pub y: String,
|
||||
pub map_name: String,
|
||||
|
|
|
@ -1,19 +1,16 @@
|
|||
#[macro_use]
|
||||
extern crate diesel;
|
||||
pub extern crate diesel;
|
||||
|
||||
pub mod dtos;
|
||||
pub use diesel::{OptionalExtension, dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
|
||||
mod models;
|
||||
mod schema;
|
||||
|
||||
use chrono::NaiveDate;
|
||||
use chrono::{Local, NaiveDateTime};
|
||||
use diesel::OptionalExtension;
|
||||
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
|
||||
use dtos::Root;
|
||||
use hex;
|
||||
use models::{
|
||||
Arrival, DepatureRequest, NewDepatureRequest, ServingLine, ServingLineDepatureRequest,
|
||||
StationHint, StopList,
|
||||
Arrival, DepatureRequest, ServingLine, ServingLineDepatureRequest, StationHint, StopList,
|
||||
};
|
||||
use schema::arrivals::dsl::arrivals;
|
||||
use schema::depature_requests::dsl::depature_requests;
|
||||
|
@ -22,14 +19,25 @@ use schema::serving_lines::dsl::serving_lines;
|
|||
use schema::station_hints::dsl::station_hints;
|
||||
use schema::stop_lists::dsl::stop_lists;
|
||||
use serde_json;
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
use std::{env, error::Error, fs, io::Write, str::FromStr};
|
||||
use std::{error::Error, str::FromStr};
|
||||
use uuid::Uuid;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum WilldatenError {
|
||||
#[error("Missing arrivals in JSON!")]
|
||||
MissingArrivals,
|
||||
}
|
||||
|
||||
|
||||
pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> {
|
||||
let arrival_list = data.arrival_list.unwrap();
|
||||
let Some(arrival_list) = data.arrival_list else { return Err(Box::new(WilldatenError::MissingArrivals)) };
|
||||
if let Some(req) = insert_into(depature_requests)
|
||||
.values(NewDepatureRequest {
|
||||
.values(DepatureRequest {
|
||||
depatureRequestId: Uuid::new_v5(
|
||||
&Uuid::NAMESPACE_DNS,
|
||||
serde_json::to_string(&data.parameters)?.as_bytes(),
|
||||
),
|
||||
stopid: i32::from_str(&data.dm.itd_odv_assigned_stops.stop_id).ok(),
|
||||
serverid: data
|
||||
.parameters
|
||||
|
@ -130,9 +138,10 @@ pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dy
|
|||
.get_result(conn)
|
||||
.optional();
|
||||
for line in data.serving_lines.lines {
|
||||
let id = hex::encode(Sha256::digest(
|
||||
serde_json::to_string(&line.mode).unwrap().into_bytes(),
|
||||
));
|
||||
let id = Uuid::new_v5(
|
||||
&Uuid::NAMESPACE_DNS,
|
||||
serde_json::to_string(&line)?.as_bytes(),
|
||||
);
|
||||
let _ = insert_into(serving_lines)
|
||||
.values(ServingLine {
|
||||
servingLineId: id.clone(),
|
||||
|
|
|
@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
dotenv().ok();
|
||||
let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||
let mut conn =
|
||||
PgConnection::establish(&url).unwrap_or_else(|_| panic!("Error connecting to {}", url));
|
||||
PgConnection::establish(&url).unwrap_or_else(|e| panic!("Error connecting to {}: {}", url, e));
|
||||
|
||||
let src = "/home/bread/Downloads/2025-01-22(1)/2025-01-22";
|
||||
let total = fs::read_dir(src).unwrap().count();
|
||||
|
|
|
@ -8,12 +8,14 @@ use crate::schema::*;
|
|||
use chrono::NaiveDate;
|
||||
use chrono::NaiveDateTime;
|
||||
use diesel::pg::Pg;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
|
||||
#[diesel(table_name = arrivals)]
|
||||
#[diesel(check_for_backend(Pg))]
|
||||
#[diesel(primary_key(stopID, servingLine_key, servingLine_stateless))]
|
||||
pub struct Arrival {
|
||||
pub depatureRequestId: i32,
|
||||
pub depatureRequestId: Uuid,
|
||||
pub stopID: Option<i32>,
|
||||
pub x: Option<f64>,
|
||||
pub y: Option<f64>,
|
||||
|
@ -58,7 +60,7 @@ pub struct Arrival {
|
|||
#[diesel(check_for_backend(Pg))]
|
||||
#[diesel(primary_key(depatureRequestId))]
|
||||
pub struct DepatureRequest {
|
||||
pub depatureRequestId: i32,
|
||||
pub depatureRequestId: Uuid,
|
||||
pub stopid: Option<i32>,
|
||||
pub serverid: Option<String>,
|
||||
pub requestid: Option<String>,
|
||||
|
@ -86,22 +88,22 @@ pub struct NewDepatureRequest {
|
|||
#[diesel(primary_key(infoLinkUrl))]
|
||||
pub struct DepatureRequeststationHint {
|
||||
pub infoLinkUrl: String,
|
||||
pub depatureRequestId: Option<i32>,
|
||||
pub depatureRequestId: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
|
||||
#[diesel(table_name = serving_line_depature_requests)]
|
||||
#[diesel(primary_key(depatureRequestId))]
|
||||
pub struct ServingLineDepatureRequest {
|
||||
pub depatureRequestId: i32,
|
||||
pub servingLineId: String,
|
||||
pub depatureRequestId: Uuid,
|
||||
pub servingLineId: Uuid,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Selectable, Insertable, Debug, Identifiable, QueryableByName)]
|
||||
#[diesel(table_name = serving_lines)]
|
||||
#[diesel(primary_key(servingLineId))]
|
||||
pub struct ServingLine {
|
||||
pub servingLineId: String,
|
||||
pub servingLineId: Uuid,
|
||||
pub mode_name: Option<String>,
|
||||
pub mode_number: Option<String>,
|
||||
pub mode_product: Option<String>,
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
diesel::table! {
|
||||
arrivals (stopID, servingLine_key, servingLine_stateless) {
|
||||
depatureRequestId -> Int4,
|
||||
depatureRequestId -> Uuid,
|
||||
stopID -> Nullable<Int4>,
|
||||
x -> Nullable<Float8>,
|
||||
y -> Nullable<Float8>,
|
||||
|
@ -72,7 +72,7 @@ diesel::table! {
|
|||
|
||||
diesel::table! {
|
||||
depature_requests (depatureRequestId) {
|
||||
depatureRequestId -> Int4,
|
||||
depatureRequestId -> Uuid,
|
||||
stopid -> Nullable<Int4>,
|
||||
#[max_length = 255]
|
||||
serverid -> Nullable<Varchar>,
|
||||
|
@ -91,22 +91,20 @@ diesel::table! {
|
|||
depature_requeststation_hints (infoLinkUrl) {
|
||||
#[max_length = 255]
|
||||
infoLinkUrl -> Varchar,
|
||||
depatureRequestId -> Nullable<Int4>,
|
||||
depatureRequestId -> Nullable<Uuid>,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
serving_line_depature_requests (depatureRequestId) {
|
||||
depatureRequestId -> Int4,
|
||||
#[max_length = 255]
|
||||
servingLineId -> VarChar,
|
||||
depatureRequestId -> Uuid,
|
||||
servingLineId -> Uuid,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
serving_lines (servingLineId) {
|
||||
#[max_length = 255]
|
||||
servingLineId -> VarChar,
|
||||
servingLineId -> Uuid,
|
||||
#[max_length = 255]
|
||||
mode_name -> Nullable<Varchar>,
|
||||
#[max_length = 50]
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
services:
|
||||
|
||||
db:
|
||||
image: postgres
|
||||
restart: always
|
||||
environment:
|
||||
POSTGRES_HOST_AUTH_METHOD: trust
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_USER: bread
|
||||
POSTGRES_DB: bread
|
||||
ports:
|
||||
- 5432:5432
|
||||
|
|
Loading…
Add table
Reference in a new issue