spider_pipeline/
csv.rs

1//! CSV output pipeline.
2//!
3//! [`CsvPipeline`] writes items to a CSV file, inferring headers from the first
4//! item and serializing nested values into JSON strings when needed.
5
6use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use csv::Writer;
10use kanal::bounded_async;
11use log::{debug, error, info};
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use spider_util::error::PipelineError;
15use spider_util::item::ScrapedItem;
16use std::fs::{File, OpenOptions};
17use std::marker::PhantomData;
18use std::path::Path;
19
20#[derive(Serialize, Deserialize)]
21struct CsvState {
22    headers: Vec<String>,
23    #[serde(default)]
24    write_header_on_next_write: bool,
25}
26
27struct CsvWriterState {
28    writer: Writer<File>,
29    headers: Vec<String>,
30    write_header_on_next_write: bool,
31}
32
33enum CsvCommand {
34    Write {
35        item_value: Value,
36        responder: kanal::AsyncSender<Result<(), PipelineError>>,
37    },
38    GetState(kanal::AsyncSender<Result<Option<Value>, PipelineError>>),
39    RestoreState {
40        state: Value,
41        responder: kanal::AsyncSender<Result<(), PipelineError>>,
42    },
43    Shutdown(kanal::AsyncSender<()>),
44}
45
46/// A pipeline that exports scraped items to a CSV file.
47/// Headers are determined from the keys of the first item processed.
48pub struct CsvPipeline<I> {
49    command_sender: kanal::AsyncSender<CsvCommand>,
50    export_config: Option<SchemaExportConfig>,
51    _phantom: PhantomData<I>,
52}
53
54impl<I: ScrapedItem> CsvPipeline<I> {
55    const COMMAND_CHANNEL_CAPACITY: usize = 4096;
56    const FLUSH_EVERY_WRITES: usize = 250;
57
58    /// Creates a new `CsvPipeline`.
59    ///
60    /// # Errors
61    ///
62    /// Returns an error when the output directory cannot be created.
63    pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
64        spider_util::util::validate_output_dir(&file_path)
65            .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
66        let path_buf = file_path.as_ref().to_path_buf();
67        info!("Initializing CsvPipeline for file: {:?}", path_buf);
68
69        let (command_sender, command_receiver) =
70            bounded_async::<CsvCommand>(Self::COMMAND_CHANNEL_CAPACITY);
71        let path_clone = path_buf.clone();
72
73        tokio::task::spawn(async move {
74            let mut writer_state: Option<CsvWriterState> = None;
75            let mut pending_writes = 0usize;
76
77            info!("CSV async task started for file: {:?}", path_clone);
78
79            while let Ok(command) = command_receiver.recv().await {
80                match command {
81                    CsvCommand::Write {
82                        item_value,
83                        responder,
84                    } => {
85                        let result = (|| {
86                            if writer_state.is_none() {
87                                let should_write_header = should_write_header(&path_clone)?;
88
89                                let file = OpenOptions::new()
90                                    .create(true)
91                                    .append(true)
92                                    .open(&path_clone)?;
93
94                                let writer = Writer::from_writer(file);
95                                let headers = if let Some(map) = item_value.as_object() {
96                                    let mut h: Vec<String> = map.keys().cloned().collect();
97                                    h.sort();
98                                    h
99                                } else {
100                                    return Err(PipelineError::ItemError(
101                                        "First item for CSV must be a JSON object".to_string(),
102                                    ));
103                                };
104
105                                writer_state = Some(CsvWriterState {
106                                    writer,
107                                    headers,
108                                    write_header_on_next_write: should_write_header,
109                                });
110                            }
111
112                            let state = match writer_state.as_mut() {
113                                Some(state) => state,
114                                None => {
115                                    return Err(PipelineError::Other(
116                                        "CSV writer state missing unexpectedly".to_string(),
117                                    ));
118                                }
119                            };
120                            if state.write_header_on_next_write {
121                                state.writer.write_record(&state.headers)?;
122                                state.write_header_on_next_write = false;
123                            }
124                            let record = if let Some(map) = item_value.as_object() {
125                                state
126                                    .headers
127                                    .iter()
128                                    .map(|h| {
129                                        map.get(h)
130                                            .map(|v| {
131                                                if let Some(s) = v.as_str() {
132                                                    s.to_string()
133                                                } else {
134                                                    v.to_string()
135                                                }
136                                            })
137                                            .unwrap_or_default()
138                                    })
139                                    .collect::<Vec<String>>()
140                            } else {
141                                return Err(PipelineError::ItemError(
142                                    "Item for CSV must be a JSON object.".to_string(),
143                                ));
144                            };
145
146                            state.writer.write_record(&record)?;
147                            pending_writes += 1;
148                            if pending_writes >= Self::FLUSH_EVERY_WRITES {
149                                state.writer.flush()?;
150                                pending_writes = 0;
151                            }
152                            Ok(())
153                        })();
154
155                        if responder.send(result).await.is_err() {
156                            error!("Failed to send CSV write response.");
157                        }
158                    }
159                    CsvCommand::GetState(responder) => {
160                        let result = (|| {
161                            if let Some(state) = &writer_state {
162                                let state = CsvState {
163                                    headers: state.headers.clone(),
164                                    write_header_on_next_write: state.write_header_on_next_write,
165                                };
166                                let value = serde_json::to_value(state)?;
167                                Ok(Some(value))
168                            } else {
169                                Ok(None)
170                            }
171                        })();
172                        if responder.send(result).await.is_err() {
173                            error!("Failed to send GetState response.");
174                        }
175                    }
176                    CsvCommand::RestoreState { state, responder } => {
177                        let result = (|| {
178                            let state: CsvState = serde_json::from_value(state)?;
179                            let file = OpenOptions::new()
180                                .create(true)
181                                .append(true)
182                                .open(&path_clone)?;
183                            let writer = Writer::from_writer(file);
184                            writer_state = Some(CsvWriterState {
185                                writer,
186                                headers: state.headers,
187                                write_header_on_next_write: state.write_header_on_next_write
188                                    || should_write_header(&path_clone)?,
189                            });
190                            info!("CSV Exporter state restored.");
191                            Ok(())
192                        })();
193                        if responder.send(result).await.is_err() {
194                            error!("Failed to send RestoreState response.");
195                        }
196                    }
197                    CsvCommand::Shutdown(responder) => {
198                        info!("CSV async task received shutdown command.");
199                        if let Some(state) = writer_state.as_mut()
200                            && let Err(e) = state.writer.flush()
201                        {
202                            error!("Failed to flush CSV writer on shutdown: {}", e);
203                        }
204                        let _ = responder.send(()).await;
205                        break;
206                    }
207                }
208            }
209            info!("CSV async task for file: {:?} finished.", path_clone);
210        });
211
212        Ok(CsvPipeline {
213            command_sender,
214            export_config: None,
215            _phantom: PhantomData,
216        })
217    }
218
219    /// Applies typed export mapping before values are written.
220    pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
221        self.export_config = Some(config);
222        self
223    }
224}
225
226fn should_write_header(path: &Path) -> Result<bool, PipelineError> {
227    Ok(!path.exists() || path.metadata()?.len() == 0)
228}
229
230#[async_trait]
231impl<I: ScrapedItem> Pipeline<I> for CsvPipeline<I> {
232    fn name(&self) -> &str {
233        "CsvPipeline"
234    }
235
236    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
237        debug!("CsvPipeline processing item.");
238        let item_value = map_item_for_export(&item, self.export_config.as_ref());
239
240        let (tx, rx) = kanal::bounded_async(1);
241        self.command_sender
242            .send(CsvCommand::Write {
243                item_value,
244                responder: tx,
245            })
246            .await
247            .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
248
249        let result = rx.recv().await.map_err(|e| {
250            PipelineError::Other(format!("Failed to receive Write response: {}", e))
251        })?;
252        result?;
253
254        Ok(Some(item))
255    }
256
257    async fn close(&self) -> Result<(), PipelineError> {
258        info!("Closing CsvPipeline.");
259        let (tx, rx) = kanal::bounded_async(1);
260        self.command_sender
261            .send(CsvCommand::Shutdown(tx))
262            .await
263            .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
264        rx.recv().await.map_err(|e| {
265            PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
266        })?;
267        Ok(())
268    }
269
270    async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
271        let (tx, rx) = kanal::bounded_async(1);
272        self.command_sender
273            .send(CsvCommand::GetState(tx))
274            .await
275            .map_err(|e| PipelineError::Other(format!("Failed to send GetState command: {}", e)))?;
276        let result = rx.recv().await.map_err(|e| {
277            PipelineError::Other(format!("Failed to receive GetState response: {}", e))
278        })?;
279        Ok(result?)
280    }
281
282    async fn restore_state(&self, state: Value) -> Result<(), PipelineError> {
283        let (tx, rx) = kanal::bounded_async(1);
284        self.command_sender
285            .send(CsvCommand::RestoreState {
286                state,
287                responder: tx,
288            })
289            .await
290            .map_err(|e| {
291                PipelineError::Other(format!("Failed to send RestoreState command: {}", e))
292            })?;
293        let result = rx.recv().await.map_err(|e| {
294            PipelineError::Other(format!("Failed to receive RestoreState response: {}", e))
295        })?;
296        Ok(result?)
297    }
298}