spider_pipeline/
stream_json.rs

1//! Streaming JSON output pipeline.
2//!
3//! [`StreamJsonPipeline`] writes items to a JSON array incrementally instead of
4//! holding the full result set in memory.
5
6use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use kanal::bounded_async;
10use log::{debug, error, info};
11use serde_json::Value;
12use spider_util::error::PipelineError;
13use spider_util::item::ScrapedItem;
14use std::marker::PhantomData;
15use std::path::Path;
16use tokio::fs::OpenOptions;
17use tokio::io::{AsyncWriteExt, BufWriter};
18
19const DEFAULT_BATCH_SIZE: usize = 100;
20
21enum StreamJsonCommand {
22    Write(Value),
23    Shutdown(kanal::AsyncSender<Result<(), PipelineError>>),
24}
25
26/// A pipeline that streams items directly to a JSON file without accumulating them in memory.
27pub struct StreamJsonPipeline<I: ScrapedItem> {
28    command_sender: kanal::AsyncSender<StreamJsonCommand>,
29    export_config: Option<SchemaExportConfig>,
30    _phantom: PhantomData<I>,
31}
32
33impl<I: ScrapedItem> StreamJsonPipeline<I> {
34    const COMMAND_CHANNEL_CAPACITY: usize = 1024;
35
36    /// Creates a new `StreamJsonPipeline` with default batch size.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error when the output directory cannot be created.
41    pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
42        Self::with_batch_size(file_path, DEFAULT_BATCH_SIZE)
43    }
44
45    /// Creates a new `StreamJsonPipeline` with a specified batch size.
46    ///
47    /// # Errors
48    ///
49    /// Returns an error when the output directory cannot be created.
50    pub fn with_batch_size(
51        file_path: impl AsRef<Path>,
52        batch_size: usize,
53    ) -> Result<Self, PipelineError> {
54        spider_util::util::validate_output_dir(&file_path)
55            .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
56        let path_buf = file_path.as_ref().to_path_buf();
57        info!("Initializing StreamJsonPipeline for file: {:?}", path_buf);
58
59        let (command_sender, command_receiver) =
60            bounded_async::<StreamJsonCommand>(Self::COMMAND_CHANNEL_CAPACITY);
61
62        tokio::task::spawn(async move {
63            let file = OpenOptions::new()
64                .create(true)
65                .write(true)
66                .truncate(true)
67                .open(&path_buf)
68                .await
69                .map_err(|e| {
70                    error!("Failed to create/open file {:?}: {}", path_buf, e);
71                })
72                .ok();
73
74            if let Some(file) = file {
75                let mut writer = BufWriter::new(file);
76                let mut items_buffer = Vec::with_capacity(batch_size);
77                let mut first_item = true;
78
79                if writer.write_all(b"[\n").await.is_err() {
80                    error!("Failed to write opening bracket to file: {:?}", path_buf);
81                }
82
83                info!(
84                    "StreamJsonPipeline async task started for file: {:?}",
85                    path_buf
86                );
87
88                while let Ok(command) = command_receiver.recv().await {
89                    match command {
90                        StreamJsonCommand::Write(value) => {
91                            items_buffer.push(value);
92
93                            if items_buffer.len() >= batch_size {
94                                flush_items(&mut writer, &mut items_buffer, &mut first_item)
95                                    .await
96                                    .ok();
97                            }
98                        }
99                        StreamJsonCommand::Shutdown(responder) => {
100                            if !items_buffer.is_empty() {
101                                flush_items(&mut writer, &mut items_buffer, &mut first_item)
102                                    .await
103                                    .ok();
104                            }
105
106                            let result = async {
107                                writer
108                                    .write_all(b"\n]")
109                                    .await
110                                    .map_err(|e| PipelineError::IoError(e.to_string()))?;
111                                writer
112                                    .flush()
113                                    .await
114                                    .map_err(|e| PipelineError::IoError(e.to_string()))
115                            }
116                            .await;
117
118                            if responder.send(result).await.is_err() {
119                                error!("Failed to send shutdown response.");
120                            }
121                            break;
122                        }
123                    }
124                }
125
126                info!(
127                    "StreamJsonPipeline async task for file: {:?} finished.",
128                    path_buf
129                );
130            }
131        });
132
133        Ok(StreamJsonPipeline {
134            command_sender,
135            export_config: None,
136            _phantom: PhantomData,
137        })
138    }
139
140    /// Applies typed export mapping before values are written.
141    pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
142        self.export_config = Some(config);
143        self
144    }
145}
146
147async fn flush_items(
148    writer: &mut BufWriter<tokio::fs::File>,
149    items_buffer: &mut Vec<Value>,
150    first_item: &mut bool,
151) -> Result<(), PipelineError> {
152    for item in items_buffer.drain(..) {
153        let item_str = serde_json::to_string(&item)
154            .map_err(|e| PipelineError::SerializationError(e.to_string()))?;
155
156        if *first_item {
157            *first_item = false;
158        } else {
159            writer
160                .write_all(b",\n")
161                .await
162                .map_err(|e| PipelineError::IoError(e.to_string()))?;
163        }
164
165        writer
166            .write_all(b"  ")
167            .await
168            .map_err(|e| PipelineError::IoError(e.to_string()))?;
169        writer
170            .write_all(item_str.as_bytes())
171            .await
172            .map_err(|e| PipelineError::IoError(e.to_string()))?;
173    }
174
175    Ok(())
176}
177
178#[async_trait]
179impl<I: ScrapedItem> Pipeline<I> for StreamJsonPipeline<I> {
180    fn name(&self) -> &str {
181        "StreamJsonPipeline"
182    }
183
184    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
185        debug!("StreamJsonPipeline processing item.");
186        let json_value = map_item_for_export(&item, self.export_config.as_ref());
187
188        self.command_sender
189            .send(StreamJsonCommand::Write(json_value))
190            .await
191            .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
192
193        Ok(Some(item))
194    }
195
196    async fn close(&self) -> Result<(), PipelineError> {
197        info!("Closing StreamJsonPipeline.");
198        let (tx, rx) = kanal::bounded_async(1);
199        self.command_sender
200            .send(StreamJsonCommand::Shutdown(tx))
201            .await
202            .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
203
204        rx.recv().await.map_err(|e| {
205            PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
206        })?
207    }
208}