use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use rusqlite::Connection; use rusqlite::OptionalExtension; use sha2::{Digest, Sha256}; use std::io::{Read, Write}; use super::error::Result; use super::time::now_ms; pub struct StorePayloadOptions<'a> { pub project_id: Option, pub resource_type: &'a str, pub gitlab_id: &'a str, pub json_bytes: &'a [u8], pub compress: bool, } pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result { let json_bytes = options.json_bytes; let mut hasher = Sha256::new(); hasher.update(json_bytes); let payload_hash = format!("{:x}", hasher.finalize()); let existing: Option = conn .query_row( "SELECT id FROM raw_payloads WHERE project_id IS ? AND resource_type = ? AND gitlab_id = ? AND payload_hash = ?", ( options.project_id, options.resource_type, options.gitlab_id, &payload_hash, ), |row| row.get(0), ) .optional()?; if let Some(id) = existing { return Ok(id); } let (encoding, payload_bytes): (&str, std::borrow::Cow<'_, [u8]>) = if options.compress { let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); encoder.write_all(json_bytes)?; ("gzip", std::borrow::Cow::Owned(encoder.finish()?)) } else { ("identity", std::borrow::Cow::Borrowed(json_bytes)) }; conn.execute( "INSERT INTO raw_payloads (source, project_id, resource_type, gitlab_id, fetched_at, content_encoding, payload_hash, payload) VALUES ('gitlab', ?, ?, ?, ?, ?, ?, ?)", ( options.project_id, options.resource_type, options.gitlab_id, now_ms(), encoding, &payload_hash, payload_bytes.as_ref(), ), )?; Ok(conn.last_insert_rowid()) } pub fn read_payload(conn: &Connection, id: i64) -> Result> { let row: Option<(String, Vec)> = conn .query_row( "SELECT content_encoding, payload FROM raw_payloads WHERE id = ?", [id], |row| Ok((row.get(0)?, row.get(1)?)), ) .optional()?; let Some((encoding, payload_bytes)) = row else { return Ok(None); }; let json_bytes = if encoding == "gzip" { let mut decoder = GzDecoder::new(&payload_bytes[..]); let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; decompressed } else { payload_bytes }; let value: serde_json::Value = serde_json::from_slice(&json_bytes)?; Ok(Some(value)) } #[cfg(test)] #[path = "payloads_tests.rs"] mod tests;