throw out rocket and use pipeline instead

This commit is contained in:
theBreadCompany 2025-01-24 22:04:20 +01:00
parent 5e681ee1c8
commit 474670b0af
10 changed files with 134 additions and 1115 deletions

View file

@ -0,0 +1,23 @@
name: Willdaten fetching pipeline
on:
schedule:
- cron: "0 * * * *" # run hourly
jobs:
process_data:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Rust (if needed)
uses: dtolnay/rust-toolchain@stable
- name: Run the program
run: cargo run --release --bin datafetcher
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
REQ: ${{ vars.WILLDATEN_REQ }}
VRN_TOKEN: ${{ secrets.WILLDATEN_VRN_TOKEN }}
STATIONS: ${{ vars.WILLDATEN_STATIONS }}

1048
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1 +1 @@
.env /home/bread/Dokumente/willdaten/.env

View file

@ -5,10 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
chrono = "0.4.39" chrono = "0.4.39"
maud = { version = "0.26.0", features = ["rocket"] } reqwest = { version = "0.12.12", features = ["json"] }
reqwest = "0.12.12"
rocket = "0.5.1"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] } tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros", "time"] }
dataworker = { path = "../dataworker" } dataworker = { path = "../dataworker" }
serde = { version = "1.0.217", features = ["derive"] } dotenvy = "0.15.7"
serde_json = "1.0.137"

View file

@ -1,5 +0,0 @@
use std::time::Duration;
fn main() {
let time_window = Duration::from_secs(3600);
}

View file

@ -1,98 +1,54 @@
#[macro_use] use chrono::Local;
extern crate rocket; use dataworker::diesel::{Connection, PgConnection};
use maud::{html, Markup, PreEscaped, DOCTYPE}; use dataworker::dtos::Root;
use rocket::response::stream::{Event, EventStream}; use dataworker::{insert_document, WilldatenError};
use rocket::tokio::time::{interval, Duration}; use dotenvy::dotenv;
use std::env;
use std::io::Write;
struct SsePayload {} #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv().ok();
let stations: Vec<_> = env::var("STATIONS")?
.split(',')
.map(|s| s.to_string())
.collect();
let token = env::var("VRN_TOKEN")?;
let req = env::var("REQ")?;
let db_url = env::var("DATABASE_URL")?;
let mut conn = PgConnection::establish(&db_url)?;
#[get("/sse")] let total = stations.len();
async fn sse() -> EventStream![] { let mut count = 0;
EventStream! {
let mut interval = interval(Duration::from_secs(2));
loop {
interval.tick().await;
yield Event::data(format!("Updated time: {:?}", chrono::Utc::now()));
}
}
}
#[get("/")] let client = reqwest::Client::new();
fn index() -> Markup {
html! { for station in stations {
(DOCTYPE) count += 1;
html lang="en" { let date = Local::now();
head { let req = req
meta charset="utf-8"; .replace("TEMPLATE_ID", &station)
title { "Live Updates with SSE" } .replace("TEMPLATE_DATE", &date.format("%Y%m%d").to_string())
style { .replace("TEMPLATE_DATE", &date.format("%H%M").to_string());
"body { print!("\x1B[2K\rFetching {}/{} ({}) ", count, total, station);
color: white; std::io::stdout().flush().unwrap();
display: flex; if let Ok(doc) = client
flex-direction: column; .get(req)
align-items: center; .header("Authorization", &token)
justify-content: center; .send()
height: 100vh; .await?
background: rgb(34,187,195); .json::<Root>()
background: linear-gradient(0deg, rgba(34,187,195,1) 0%, rgba(102,45,253,1) 100%); .await {
font-family: sans-serif; let place_name = doc.dm.itd_odv_assigned_stops.name_with_place.clone();
} match insert_document(doc,
.glassy { &mut conn,
background: rgba(255, 255, 255, 0.2); ) {
border-radius: 16px; Ok(_) => {}
box-shadow: 0 4px 30px rgba(0, 0, 0, 0.1); Err(ref e) if e.downcast_ref::<WilldatenError>().is_some() => println!("\x1B[2K\rMissing arrivals for {} ({})!", station, place_name),
backdrop-filter: blur(2.8px); Err(e) => println!("\x1B[2K\rError inserting document: {}", e),
-webkit-backdrop-filter: blur(2.8px);
border: 1px solid rgba(255, 255, 255, 0.3);
}
.data-grid {
margin: 2em;
padding: 2em;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
border-radius: 18px;
}
.data-cell {
margin: 0.5em 1em;
padding: 1em;
border-radius: 18px;
display: flex;
flex-direction: column;
width: 100%;
}
"
}
script {
(PreEscaped("const eventSource = new EventSource('/sse');
eventSource.onmessage = (event) => {
document.getElementById('updates').innerText = event.data;
};"))
}
} }
body style="" { } else { println!("\x1B[2K\rFailed to fetch station {}", station); }
h1 style="align-items: center;" { "willdaten dashboard" }
div class="glassy data-grid" {
div class="glassy data-cell" {
h3 { "Fetcher" }
div id="updates" { "Waiting for updates..." }
}
div class="glassy data-cell" {
h3 { "Database" }
div id="updates" { "Waiting for updates..." }
}
div class="glassy data-cell" {
h3 { "Statistics" }
div id="updates" { "Waiting for updates..." }
}
}
}
}
} }
} println!();
Ok(())
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index, sse])
} }

View file

@ -1 +1 @@
.env /home/bread/Dokumente/willdaten/.env

View file

@ -7,8 +7,8 @@ edition = "2021"
chrono = "0.4.39" chrono = "0.4.39"
diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] } diesel = { version = "2.2.6", features = ["64-column-tables", "chrono", "postgres", "postgres_backend", "uuid"] }
dotenvy = "0.15.7" dotenvy = "0.15.7"
hex = "0.4.3"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.135" serde_json = "1.0.135"
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] } tokio = { version = "1.43.0", features = ["full"] }
uuid = { version = "1.12.1", features = ["v5"] } uuid = { version = "1.12.1", features = ["v5"] }

View file

@ -1,16 +1,14 @@
#[macro_use] #[macro_use]
extern crate diesel; pub extern crate diesel;
pub mod dtos; pub mod dtos;
pub use diesel::{OptionalExtension, dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
mod models; mod models;
mod schema; mod schema;
use chrono::NaiveDate; use chrono::NaiveDate;
use chrono::{Local, NaiveDateTime}; use chrono::{Local, NaiveDateTime};
use diesel::OptionalExtension;
use diesel::{dsl::insert_into, Connection, PgConnection, RunQueryDsl, SelectableHelper};
use dtos::Root; use dtos::Root;
use hex;
use models::{ use models::{
Arrival, DepatureRequest, ServingLine, ServingLineDepatureRequest, StationHint, StopList, Arrival, DepatureRequest, ServingLine, ServingLineDepatureRequest, StationHint, StopList,
}; };
@ -21,11 +19,19 @@ use schema::serving_lines::dsl::serving_lines;
use schema::station_hints::dsl::station_hints; use schema::station_hints::dsl::station_hints;
use schema::stop_lists::dsl::stop_lists; use schema::stop_lists::dsl::stop_lists;
use serde_json; use serde_json;
use std::{error::Error, fs, io::Write, str::FromStr}; use std::{error::Error, str::FromStr};
use uuid::Uuid; use uuid::Uuid;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum WilldatenError {
#[error("Missing arrivals in JSON!")]
MissingArrivals,
}
pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> { pub fn insert_document(data: Root, conn: &mut PgConnection) -> Result<(), Box<dyn Error>> {
let arrival_list = data.arrival_list.unwrap(); let Some(arrival_list) = data.arrival_list else { return Err(Box::new(WilldatenError::MissingArrivals)) };
if let Some(req) = insert_into(depature_requests) if let Some(req) = insert_into(depature_requests)
.values(DepatureRequest { .values(DepatureRequest {
depatureRequestId: Uuid::new_v5( depatureRequestId: Uuid::new_v5(

View file

@ -7,4 +7,4 @@ services:
POSTGRES_USER: bread POSTGRES_USER: bread
POSTGRES_DB: bread POSTGRES_DB: bread
ports: ports:
- 5433:5432 - 5432:5432