spider_pipeline/
jsonl.rs

1//! JSON Lines output pipeline.
2//!
3//! [`JsonlPipeline`] appends one serialized item per line, which makes it a
4//! good fit for streaming workflows and shell-based processing.
5
6use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use log::{debug, info};
10use spider_util::{error::PipelineError, item::ScrapedItem};
11use std::fs::OpenOptions;
12use std::io::Write;
13use std::marker::PhantomData;
14use std::path::Path;
15use tokio::sync::{mpsc, oneshot};
16
17enum JsonlCommand {
18    Write {
19        serialized_item: String,
20        responder: oneshot::Sender<Result<(), PipelineError>>,
21    },
22    Shutdown(oneshot::Sender<Result<(), PipelineError>>),
23}
24
25/// A pipeline that writes each scraped item to a JSON Lines (.jsonl) file.
26/// Each item is written as a JSON object on a new line.
27pub struct JsonlPipeline<I: ScrapedItem> {
28    command_sender: mpsc::Sender<JsonlCommand>,
29    export_config: Option<SchemaExportConfig>,
30    _phantom: PhantomData<I>,
31}
32
33impl<I: ScrapedItem> JsonlPipeline<I> {
34    const COMMAND_CHANNEL_CAPACITY: usize = 1024;
35    const FLUSH_EVERY_WRITES: usize = 100;
36
37    /// Creates a new `JsonlPipeline` that writes to the specified file path.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error when the output file cannot be opened or the parent
42    /// directory cannot be created.
43    pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
44        spider_util::util::validate_output_dir(&file_path)
45            .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
46        let path_buf = file_path.as_ref().to_path_buf();
47        info!("Initializing JsonlPipeline for file: {:?}", path_buf);
48
49        let (command_sender, mut command_receiver) =
50            mpsc::channel::<JsonlCommand>(Self::COMMAND_CHANNEL_CAPACITY);
51
52        tokio::task::spawn_blocking(move || {
53            let file_result = OpenOptions::new().create(true).append(true).open(&path_buf);
54            let mut file = match file_result {
55                Ok(file) => file,
56                Err(e) => {
57                    while let Some(command) = command_receiver.blocking_recv() {
58                        match command {
59                            JsonlCommand::Write { responder, .. } => {
60                                let _ = responder.send(Err(PipelineError::IoError(e.to_string())));
61                            }
62                            JsonlCommand::Shutdown(responder) => {
63                                let _ = responder.send(Err(PipelineError::IoError(e.to_string())));
64                                break;
65                            }
66                        }
67                    }
68                    return;
69                }
70            };
71
72            let mut pending_writes = 0usize;
73            while let Some(command) = command_receiver.blocking_recv() {
74                match command {
75                    JsonlCommand::Write {
76                        serialized_item,
77                        responder,
78                    } => {
79                        let result = (|| -> Result<(), PipelineError> {
80                            file.write_all(serialized_item.as_bytes())?;
81                            file.write_all(b"\n")?;
82                            pending_writes += 1;
83                            if pending_writes >= Self::FLUSH_EVERY_WRITES {
84                                file.flush()?;
85                                pending_writes = 0;
86                            }
87                            Ok(())
88                        })();
89                        let _ = responder.send(result);
90                    }
91                    JsonlCommand::Shutdown(responder) => {
92                        let result = file.flush().map_err(PipelineError::from);
93                        let _ = responder.send(result);
94                        break;
95                    }
96                }
97            }
98        });
99
100        Ok(JsonlPipeline {
101            command_sender,
102            export_config: None,
103            _phantom: PhantomData,
104        })
105    }
106
107    /// Applies typed export mapping before values are written.
108    pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
109        self.export_config = Some(config);
110        self
111    }
112}
113
114#[async_trait]
115impl<I: ScrapedItem> Pipeline<I> for JsonlPipeline<I> {
116    fn name(&self) -> &str {
117        "JsonlPipeline"
118    }
119
120    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
121        debug!("JsonlPipeline processing item.");
122        let json_value = map_item_for_export(&item, self.export_config.as_ref());
123        let serialized_item = serde_json::to_string(&json_value)?;
124
125        let (tx, rx) = oneshot::channel();
126        self.command_sender
127            .send(JsonlCommand::Write {
128                serialized_item,
129                responder: tx,
130            })
131            .await
132            .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
133        rx.await.map_err(|e| {
134            PipelineError::Other(format!("Failed to receive Write response: {}", e))
135        })??;
136
137        Ok(Some(item))
138    }
139
140    async fn close(&self) -> Result<(), PipelineError> {
141        let (tx, rx) = oneshot::channel();
142        self.command_sender
143            .send(JsonlCommand::Shutdown(tx))
144            .await
145            .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
146        rx.await.map_err(|e| {
147            PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
148        })?
149    }
150}