spider_pipeline/
sqlite.rs

1//! SQLite output pipeline.
2//!
3//! [`SqlitePipeline`] writes items to a SQLite table and offloads the blocking
4//! database work to a dedicated blocking task.
5
6use crate::pipeline::Pipeline;
7use crate::schema::{
8    SchemaExportConfig, export_schema_for_item, map_item_for_export, sqlite_type_for_field,
9};
10use async_trait::async_trait;
11use log::{debug, error, info, trace};
12use rusqlite::{Connection, params, params_from_iter};
13use serde_json::Value;
14use spider_util::{error::PipelineError, item::ScrapedItem};
15use std::marker::PhantomData;
16use std::path::Path;
17use tokio::sync::{Mutex, mpsc, oneshot};
18
19enum SqliteCommand {
20    CreateSchema {
21        item_value: Value,
22        responder: oneshot::Sender<Result<(), PipelineError>>,
23    },
24    InsertItem {
25        item_value: Value,
26        responder: oneshot::Sender<Result<(), PipelineError>>,
27    },
28    Shutdown(oneshot::Sender<()>),
29}
30
31/// A pipeline that writes scraped items to a SQLite database.
32/// All database operations are offloaded to a dedicated blocking thread.
33pub struct SqlitePipeline<I: ScrapedItem> {
34    command_sender: mpsc::Sender<SqliteCommand>,
35    table_created: Mutex<bool>,
36    export_config: Option<SchemaExportConfig>,
37    _phantom: PhantomData<I>,
38}
39
40impl<I: ScrapedItem> SqlitePipeline<I> {
41    /// Creates a new `SqlitePipeline` that writes items to the specified database path and table.
42    pub fn new(
43        db_path: impl AsRef<Path>,
44        table_name: impl Into<String>,
45    ) -> Result<Self, PipelineError> {
46        spider_util::util::validate_output_dir(&db_path)
47            .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
48        let path_buf = db_path.as_ref().to_path_buf();
49        let table_name_str = table_name.into();
50        info!(
51            "Initializing SqlitePipeline for DB: {:?}, Table: {}",
52            path_buf, table_name_str
53        );
54
55        let (command_sender, mut command_receiver) = mpsc::channel::<SqliteCommand>(100);
56        let db_path_clone = path_buf.clone();
57        let table_name_clone = table_name_str.clone();
58
59        tokio::task::spawn_blocking(move || {
60            let conn_result = Connection::open(&db_path_clone);
61            let mut conn = match conn_result {
62                Ok(c) => {
63                    debug!("Successfully opened SQLite DB: {:?}", db_path_clone);
64                    c
65                }
66                Err(e) => {
67                    error!("Error opening SQLite DB {:?}: {}", db_path_clone, e);
68                    return;
69                }
70            };
71
72            info!("SQLite blocking task started for DB: {:?}", db_path_clone);
73
74            while let Some(command) = command_receiver.blocking_recv() {
75                match command {
76                    SqliteCommand::CreateSchema {
77                        item_value,
78                        responder,
79                    } => {
80                        trace!("Processing CreateSchema command");
81                        let result = create_table_if_not_exists_sync(
82                            &mut conn,
83                            &table_name_clone,
84                            &item_value,
85                        );
86                        if let Err(e) = responder.send(result) {
87                            error!("Failed to send CreateSchema response: {:?}", e);
88                        } else {
89                            trace!("CreateSchema command completed successfully");
90                        }
91                    }
92                    SqliteCommand::InsertItem {
93                        item_value,
94                        responder,
95                    } => {
96                        trace!("Processing InsertItem command");
97                        let result = insert_item_sync(&mut conn, &table_name_clone, &item_value);
98                        if let Err(e) = responder.send(result) {
99                            error!("Failed to send InsertItem response: {:?}", e);
100                        } else {
101                            trace!("InsertItem command completed successfully");
102                        }
103                    }
104                    SqliteCommand::Shutdown(responder) => {
105                        debug!("SQLite blocking task received shutdown command.");
106                        let _ = responder.send(());
107                        break;
108                    }
109                }
110            }
111
112            if let Err(e) = conn.close() {
113                error!(
114                    "Error closing SQLite connection for {:?}: {:?}",
115                    db_path_clone, e
116                );
117            } else {
118                debug!(
119                    "SQLite connection closed successfully for {:?}",
120                    db_path_clone
121                );
122            }
123            info!("SQLite blocking task for DB: {:?} finished.", db_path_clone);
124        });
125
126        Ok(SqlitePipeline {
127            command_sender,
128            table_created: Mutex::new(false),
129            export_config: None,
130            _phantom: PhantomData,
131        })
132    }
133
134    /// Applies typed export mapping before rows are written.
135    pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
136        self.export_config = Some(config);
137        self
138    }
139}
140
141// Synchronous helper function to create table
142fn create_table_if_not_exists_sync(
143    conn: &mut Connection,
144    table_name: &str,
145    item_value: &Value,
146) -> Result<(), PipelineError> {
147    if let Some(map) = item_value.as_object() {
148        let mut columns = Vec::new();
149        for (key, value) in map {
150            let sqlite_type = match value {
151                Value::Null => "TEXT",
152                Value::String(value)
153                    if matches!(value.as_str(), "INTEGER" | "REAL" | "TEXT" | "BLOB") =>
154                {
155                    value.as_str()
156                }
157                Value::Bool(_) => "INTEGER",
158                Value::Number(n) => {
159                    if n.is_f64() {
160                        "REAL"
161                    } else {
162                        "INTEGER"
163                    }
164                }
165                Value::String(_) => "TEXT",
166                Value::Array(_) | Value::Object(_) => "TEXT",
167            };
168            columns.push(format!("\"{}\" {}", key, sqlite_type));
169        }
170        if columns.is_empty() {
171            return Err(PipelineError::ItemError(
172                "Cannot create table from empty item.".to_string(),
173            ));
174        }
175
176        let schema_sql = format!(
177            "CREATE TABLE IF NOT EXISTS \"{}\" ({})",
178            table_name,
179            columns.join(", ")
180        );
181        debug!(
182            "Creating table '{}' with schema: {}",
183            table_name, schema_sql
184        );
185        conn.execute(&schema_sql, params![])?;
186        debug!("Table '{}' created successfully", table_name);
187        Ok(())
188    } else {
189        Err(PipelineError::ItemError(
190            "Item must be a JSON object to infer table schema.".to_string(),
191        ))
192    }
193}
194
195fn schema_fields_to_value(fields: &[spider_util::item::ItemFieldSchema]) -> Value {
196    let mut map = serde_json::Map::new();
197    for field in fields {
198        map.insert(
199            field.name.clone(),
200            Value::String(sqlite_type_for_field(field).to_string()),
201        );
202    }
203    Value::Object(map)
204}
205
206// Synchronous helper function to insert item
207fn insert_item_sync(
208    conn: &mut Connection,
209    table_name: &str,
210    item_value: &Value,
211) -> Result<(), PipelineError> {
212    if let Some(map) = item_value.as_object() {
213        let keys: Vec<&String> = map.keys().collect();
214        let quoted_keys: Vec<String> = keys.iter().map(|k| format!("\"{}\"", k)).collect();
215        let placeholders: Vec<String> = (0..keys.len()).map(|_| "?".to_string()).collect();
216
217        let insert_sql = format!(
218            "INSERT INTO \"{}\" ({}) VALUES ({})",
219            table_name,
220            quoted_keys.join(", "),
221            placeholders.join(", ")
222        );
223        trace!("Preparing SQL statement: {}", insert_sql);
224
225        let mut stmt = conn.prepare(&insert_sql)?;
226
227        let mut rusqlite_params_vec: Vec<rusqlite::types::Value> = Vec::new();
228        for key in keys.iter() {
229            let v = map.get(*key).ok_or_else(|| {
230                PipelineError::ItemError(format!("Missing key '{}' in item map.", key))
231            })?;
232            match v {
233                Value::Null => rusqlite_params_vec.push(rusqlite::types::Value::Null),
234                Value::Bool(b) => rusqlite_params_vec
235                    .push(rusqlite::types::Value::Integer(if *b { 1 } else { 0 })),
236                Value::Number(n) => {
237                    if n.is_f64() {
238                        let float_val = n.as_f64().ok_or_else(|| {
239                            PipelineError::ItemError(format!(
240                                "Invalid floating-point value for key '{}'.",
241                                key
242                            ))
243                        })?;
244                        rusqlite_params_vec.push(rusqlite::types::Value::Real(float_val));
245                    } else {
246                        let int_val = n.as_i64().ok_or_else(|| {
247                            PipelineError::ItemError(format!(
248                                "Invalid integer value for key '{}'.",
249                                key
250                            ))
251                        })?;
252                        rusqlite_params_vec.push(rusqlite::types::Value::Integer(int_val));
253                    }
254                }
255                Value::String(s) => {
256                    rusqlite_params_vec.push(rusqlite::types::Value::Text(s.clone()))
257                }
258                Value::Array(_) | Value::Object(_) => rusqlite_params_vec
259                    .push(rusqlite::types::Value::Text(serde_json::to_string(v)?)),
260            }
261        }
262
263        trace!(
264            "Executing insert statement with {} parameters",
265            rusqlite_params_vec.len()
266        );
267        stmt.execute(params_from_iter(rusqlite_params_vec))?;
268        trace!("Item inserted successfully into table '{}'", table_name);
269        Ok(())
270    } else {
271        Err(PipelineError::ItemError(
272            "Item for insertion must be a JSON object.".to_string(),
273        ))
274    }
275}
276
277#[async_trait]
278impl<I: ScrapedItem> Pipeline<I> for SqlitePipeline<I> {
279    fn name(&self) -> &str {
280        "SqlitePipeline"
281    }
282
283    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
284        trace!("SqlitePipeline processing item");
285        let item_value = map_item_for_export(&item, self.export_config.as_ref());
286        let typed_schema = export_schema_for_item(&item, self.export_config.as_ref());
287
288        // Check if table created, and create if not
289        let mut table_created_lock = self.table_created.lock().await;
290        if !*table_created_lock {
291            debug!("Creating table schema for first item");
292            let (tx, rx) = oneshot::channel();
293            self.command_sender
294                .send(SqliteCommand::CreateSchema {
295                    item_value: typed_schema
296                        .as_ref()
297                        .map(|fields| schema_fields_to_value(fields))
298                        .unwrap_or_else(|| item_value.clone()),
299                    responder: tx,
300                })
301                .await
302                .map_err(|e| {
303                    PipelineError::Other(format!("Failed to send CreateSchema command: {}", e))
304                })?;
305            rx.await.map_err(|e| {
306                PipelineError::Other(format!("Failed to receive CreateSchema response: {}", e))
307            })??;
308            *table_created_lock = true;
309            debug!("Table schema created successfully");
310        }
311        drop(table_created_lock);
312
313        // Send insert item command
314        trace!("Sending insert item command to SQLite worker");
315        let (tx, rx) = oneshot::channel();
316        self.command_sender
317            .send(SqliteCommand::InsertItem {
318                item_value,
319                responder: tx,
320            })
321            .await
322            .map_err(|e| {
323                PipelineError::Other(format!("Failed to send InsertItem command: {}", e))
324            })?;
325        rx.await.map_err(|e| {
326            PipelineError::Other(format!("Failed to receive InsertItem response: {}", e))
327        })??;
328        trace!("Item inserted successfully");
329
330        Ok(Some(item))
331    }
332
333    async fn close(&self) -> Result<(), PipelineError> {
334        debug!("Initiating SqlitePipeline shutdown.");
335        let (tx, rx) = oneshot::channel();
336        self.command_sender
337            .send(SqliteCommand::Shutdown(tx))
338            .await
339            .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
340        rx.await.map_err(|e| {
341            PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
342        })?;
343        info!("SqlitePipeline closed successfully.");
344        Ok(())
345    }
346}