1use crate::pipeline::Pipeline;
7use crate::schema::{
8 SchemaExportConfig, export_schema_for_item, map_item_for_export, sqlite_type_for_field,
9};
10use async_trait::async_trait;
11use log::{debug, error, info, trace};
12use rusqlite::{Connection, params, params_from_iter};
13use serde_json::Value;
14use spider_util::{error::PipelineError, item::ScrapedItem};
15use std::marker::PhantomData;
16use std::path::Path;
17use tokio::sync::{Mutex, mpsc, oneshot};
18
19enum SqliteCommand {
20 CreateSchema {
21 item_value: Value,
22 responder: oneshot::Sender<Result<(), PipelineError>>,
23 },
24 InsertItem {
25 item_value: Value,
26 responder: oneshot::Sender<Result<(), PipelineError>>,
27 },
28 Shutdown(oneshot::Sender<()>),
29}
30
31pub struct SqlitePipeline<I: ScrapedItem> {
34 command_sender: mpsc::Sender<SqliteCommand>,
35 table_created: Mutex<bool>,
36 export_config: Option<SchemaExportConfig>,
37 _phantom: PhantomData<I>,
38}
39
40impl<I: ScrapedItem> SqlitePipeline<I> {
41 pub fn new(
43 db_path: impl AsRef<Path>,
44 table_name: impl Into<String>,
45 ) -> Result<Self, PipelineError> {
46 spider_util::util::validate_output_dir(&db_path)
47 .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
48 let path_buf = db_path.as_ref().to_path_buf();
49 let table_name_str = table_name.into();
50 info!(
51 "Initializing SqlitePipeline for DB: {:?}, Table: {}",
52 path_buf, table_name_str
53 );
54
55 let (command_sender, mut command_receiver) = mpsc::channel::<SqliteCommand>(100);
56 let db_path_clone = path_buf.clone();
57 let table_name_clone = table_name_str.clone();
58
59 tokio::task::spawn_blocking(move || {
60 let conn_result = Connection::open(&db_path_clone);
61 let mut conn = match conn_result {
62 Ok(c) => {
63 debug!("Successfully opened SQLite DB: {:?}", db_path_clone);
64 c
65 }
66 Err(e) => {
67 error!("Error opening SQLite DB {:?}: {}", db_path_clone, e);
68 return;
69 }
70 };
71
72 info!("SQLite blocking task started for DB: {:?}", db_path_clone);
73
74 while let Some(command) = command_receiver.blocking_recv() {
75 match command {
76 SqliteCommand::CreateSchema {
77 item_value,
78 responder,
79 } => {
80 trace!("Processing CreateSchema command");
81 let result = create_table_if_not_exists_sync(
82 &mut conn,
83 &table_name_clone,
84 &item_value,
85 );
86 if let Err(e) = responder.send(result) {
87 error!("Failed to send CreateSchema response: {:?}", e);
88 } else {
89 trace!("CreateSchema command completed successfully");
90 }
91 }
92 SqliteCommand::InsertItem {
93 item_value,
94 responder,
95 } => {
96 trace!("Processing InsertItem command");
97 let result = insert_item_sync(&mut conn, &table_name_clone, &item_value);
98 if let Err(e) = responder.send(result) {
99 error!("Failed to send InsertItem response: {:?}", e);
100 } else {
101 trace!("InsertItem command completed successfully");
102 }
103 }
104 SqliteCommand::Shutdown(responder) => {
105 debug!("SQLite blocking task received shutdown command.");
106 let _ = responder.send(());
107 break;
108 }
109 }
110 }
111
112 if let Err(e) = conn.close() {
113 error!(
114 "Error closing SQLite connection for {:?}: {:?}",
115 db_path_clone, e
116 );
117 } else {
118 debug!(
119 "SQLite connection closed successfully for {:?}",
120 db_path_clone
121 );
122 }
123 info!("SQLite blocking task for DB: {:?} finished.", db_path_clone);
124 });
125
126 Ok(SqlitePipeline {
127 command_sender,
128 table_created: Mutex::new(false),
129 export_config: None,
130 _phantom: PhantomData,
131 })
132 }
133
134 pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
136 self.export_config = Some(config);
137 self
138 }
139}
140
141fn create_table_if_not_exists_sync(
143 conn: &mut Connection,
144 table_name: &str,
145 item_value: &Value,
146) -> Result<(), PipelineError> {
147 if let Some(map) = item_value.as_object() {
148 let mut columns = Vec::new();
149 for (key, value) in map {
150 let sqlite_type = match value {
151 Value::Null => "TEXT",
152 Value::String(value)
153 if matches!(value.as_str(), "INTEGER" | "REAL" | "TEXT" | "BLOB") =>
154 {
155 value.as_str()
156 }
157 Value::Bool(_) => "INTEGER",
158 Value::Number(n) => {
159 if n.is_f64() {
160 "REAL"
161 } else {
162 "INTEGER"
163 }
164 }
165 Value::String(_) => "TEXT",
166 Value::Array(_) | Value::Object(_) => "TEXT",
167 };
168 columns.push(format!("\"{}\" {}", key, sqlite_type));
169 }
170 if columns.is_empty() {
171 return Err(PipelineError::ItemError(
172 "Cannot create table from empty item.".to_string(),
173 ));
174 }
175
176 let schema_sql = format!(
177 "CREATE TABLE IF NOT EXISTS \"{}\" ({})",
178 table_name,
179 columns.join(", ")
180 );
181 debug!(
182 "Creating table '{}' with schema: {}",
183 table_name, schema_sql
184 );
185 conn.execute(&schema_sql, params![])?;
186 debug!("Table '{}' created successfully", table_name);
187 Ok(())
188 } else {
189 Err(PipelineError::ItemError(
190 "Item must be a JSON object to infer table schema.".to_string(),
191 ))
192 }
193}
194
195fn schema_fields_to_value(fields: &[spider_util::item::ItemFieldSchema]) -> Value {
196 let mut map = serde_json::Map::new();
197 for field in fields {
198 map.insert(
199 field.name.clone(),
200 Value::String(sqlite_type_for_field(field).to_string()),
201 );
202 }
203 Value::Object(map)
204}
205
206fn insert_item_sync(
208 conn: &mut Connection,
209 table_name: &str,
210 item_value: &Value,
211) -> Result<(), PipelineError> {
212 if let Some(map) = item_value.as_object() {
213 let keys: Vec<&String> = map.keys().collect();
214 let quoted_keys: Vec<String> = keys.iter().map(|k| format!("\"{}\"", k)).collect();
215 let placeholders: Vec<String> = (0..keys.len()).map(|_| "?".to_string()).collect();
216
217 let insert_sql = format!(
218 "INSERT INTO \"{}\" ({}) VALUES ({})",
219 table_name,
220 quoted_keys.join(", "),
221 placeholders.join(", ")
222 );
223 trace!("Preparing SQL statement: {}", insert_sql);
224
225 let mut stmt = conn.prepare(&insert_sql)?;
226
227 let mut rusqlite_params_vec: Vec<rusqlite::types::Value> = Vec::new();
228 for key in keys.iter() {
229 let v = map.get(*key).ok_or_else(|| {
230 PipelineError::ItemError(format!("Missing key '{}' in item map.", key))
231 })?;
232 match v {
233 Value::Null => rusqlite_params_vec.push(rusqlite::types::Value::Null),
234 Value::Bool(b) => rusqlite_params_vec
235 .push(rusqlite::types::Value::Integer(if *b { 1 } else { 0 })),
236 Value::Number(n) => {
237 if n.is_f64() {
238 let float_val = n.as_f64().ok_or_else(|| {
239 PipelineError::ItemError(format!(
240 "Invalid floating-point value for key '{}'.",
241 key
242 ))
243 })?;
244 rusqlite_params_vec.push(rusqlite::types::Value::Real(float_val));
245 } else {
246 let int_val = n.as_i64().ok_or_else(|| {
247 PipelineError::ItemError(format!(
248 "Invalid integer value for key '{}'.",
249 key
250 ))
251 })?;
252 rusqlite_params_vec.push(rusqlite::types::Value::Integer(int_val));
253 }
254 }
255 Value::String(s) => {
256 rusqlite_params_vec.push(rusqlite::types::Value::Text(s.clone()))
257 }
258 Value::Array(_) | Value::Object(_) => rusqlite_params_vec
259 .push(rusqlite::types::Value::Text(serde_json::to_string(v)?)),
260 }
261 }
262
263 trace!(
264 "Executing insert statement with {} parameters",
265 rusqlite_params_vec.len()
266 );
267 stmt.execute(params_from_iter(rusqlite_params_vec))?;
268 trace!("Item inserted successfully into table '{}'", table_name);
269 Ok(())
270 } else {
271 Err(PipelineError::ItemError(
272 "Item for insertion must be a JSON object.".to_string(),
273 ))
274 }
275}
276
277#[async_trait]
278impl<I: ScrapedItem> Pipeline<I> for SqlitePipeline<I> {
279 fn name(&self) -> &str {
280 "SqlitePipeline"
281 }
282
283 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
284 trace!("SqlitePipeline processing item");
285 let item_value = map_item_for_export(&item, self.export_config.as_ref());
286 let typed_schema = export_schema_for_item(&item, self.export_config.as_ref());
287
288 let mut table_created_lock = self.table_created.lock().await;
290 if !*table_created_lock {
291 debug!("Creating table schema for first item");
292 let (tx, rx) = oneshot::channel();
293 self.command_sender
294 .send(SqliteCommand::CreateSchema {
295 item_value: typed_schema
296 .as_ref()
297 .map(|fields| schema_fields_to_value(fields))
298 .unwrap_or_else(|| item_value.clone()),
299 responder: tx,
300 })
301 .await
302 .map_err(|e| {
303 PipelineError::Other(format!("Failed to send CreateSchema command: {}", e))
304 })?;
305 rx.await.map_err(|e| {
306 PipelineError::Other(format!("Failed to receive CreateSchema response: {}", e))
307 })??;
308 *table_created_lock = true;
309 debug!("Table schema created successfully");
310 }
311 drop(table_created_lock);
312
313 trace!("Sending insert item command to SQLite worker");
315 let (tx, rx) = oneshot::channel();
316 self.command_sender
317 .send(SqliteCommand::InsertItem {
318 item_value,
319 responder: tx,
320 })
321 .await
322 .map_err(|e| {
323 PipelineError::Other(format!("Failed to send InsertItem command: {}", e))
324 })?;
325 rx.await.map_err(|e| {
326 PipelineError::Other(format!("Failed to receive InsertItem response: {}", e))
327 })??;
328 trace!("Item inserted successfully");
329
330 Ok(Some(item))
331 }
332
333 async fn close(&self) -> Result<(), PipelineError> {
334 debug!("Initiating SqlitePipeline shutdown.");
335 let (tx, rx) = oneshot::channel();
336 self.command_sender
337 .send(SqliteCommand::Shutdown(tx))
338 .await
339 .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
340 rx.await.map_err(|e| {
341 PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
342 })?;
343 info!("SqlitePipeline closed successfully.");
344 Ok(())
345 }
346}