spider_pipeline/
stream_json.rs1use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use kanal::bounded_async;
10use log::{debug, error, info};
11use serde_json::Value;
12use spider_util::error::PipelineError;
13use spider_util::item::ScrapedItem;
14use std::marker::PhantomData;
15use std::path::Path;
16use tokio::fs::OpenOptions;
17use tokio::io::{AsyncWriteExt, BufWriter};
18
19const DEFAULT_BATCH_SIZE: usize = 100;
20
21enum StreamJsonCommand {
22 Write(Value),
23 Shutdown(kanal::AsyncSender<Result<(), PipelineError>>),
24}
25
26pub struct StreamJsonPipeline<I: ScrapedItem> {
28 command_sender: kanal::AsyncSender<StreamJsonCommand>,
29 export_config: Option<SchemaExportConfig>,
30 _phantom: PhantomData<I>,
31}
32
33impl<I: ScrapedItem> StreamJsonPipeline<I> {
34 const COMMAND_CHANNEL_CAPACITY: usize = 1024;
35
36 pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
42 Self::with_batch_size(file_path, DEFAULT_BATCH_SIZE)
43 }
44
45 pub fn with_batch_size(
51 file_path: impl AsRef<Path>,
52 batch_size: usize,
53 ) -> Result<Self, PipelineError> {
54 spider_util::util::validate_output_dir(&file_path)
55 .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
56 let path_buf = file_path.as_ref().to_path_buf();
57 info!("Initializing StreamJsonPipeline for file: {:?}", path_buf);
58
59 let (command_sender, command_receiver) =
60 bounded_async::<StreamJsonCommand>(Self::COMMAND_CHANNEL_CAPACITY);
61
62 tokio::task::spawn(async move {
63 let file = OpenOptions::new()
64 .create(true)
65 .write(true)
66 .truncate(true)
67 .open(&path_buf)
68 .await
69 .map_err(|e| {
70 error!("Failed to create/open file {:?}: {}", path_buf, e);
71 })
72 .ok();
73
74 if let Some(file) = file {
75 let mut writer = BufWriter::new(file);
76 let mut items_buffer = Vec::with_capacity(batch_size);
77 let mut first_item = true;
78
79 if writer.write_all(b"[\n").await.is_err() {
80 error!("Failed to write opening bracket to file: {:?}", path_buf);
81 }
82
83 info!(
84 "StreamJsonPipeline async task started for file: {:?}",
85 path_buf
86 );
87
88 while let Ok(command) = command_receiver.recv().await {
89 match command {
90 StreamJsonCommand::Write(value) => {
91 items_buffer.push(value);
92
93 if items_buffer.len() >= batch_size {
94 flush_items(&mut writer, &mut items_buffer, &mut first_item)
95 .await
96 .ok();
97 }
98 }
99 StreamJsonCommand::Shutdown(responder) => {
100 if !items_buffer.is_empty() {
101 flush_items(&mut writer, &mut items_buffer, &mut first_item)
102 .await
103 .ok();
104 }
105
106 let result = async {
107 writer
108 .write_all(b"\n]")
109 .await
110 .map_err(|e| PipelineError::IoError(e.to_string()))?;
111 writer
112 .flush()
113 .await
114 .map_err(|e| PipelineError::IoError(e.to_string()))
115 }
116 .await;
117
118 if responder.send(result).await.is_err() {
119 error!("Failed to send shutdown response.");
120 }
121 break;
122 }
123 }
124 }
125
126 info!(
127 "StreamJsonPipeline async task for file: {:?} finished.",
128 path_buf
129 );
130 }
131 });
132
133 Ok(StreamJsonPipeline {
134 command_sender,
135 export_config: None,
136 _phantom: PhantomData,
137 })
138 }
139
140 pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
142 self.export_config = Some(config);
143 self
144 }
145}
146
147async fn flush_items(
148 writer: &mut BufWriter<tokio::fs::File>,
149 items_buffer: &mut Vec<Value>,
150 first_item: &mut bool,
151) -> Result<(), PipelineError> {
152 for item in items_buffer.drain(..) {
153 let item_str = serde_json::to_string(&item)
154 .map_err(|e| PipelineError::SerializationError(e.to_string()))?;
155
156 if *first_item {
157 *first_item = false;
158 } else {
159 writer
160 .write_all(b",\n")
161 .await
162 .map_err(|e| PipelineError::IoError(e.to_string()))?;
163 }
164
165 writer
166 .write_all(b" ")
167 .await
168 .map_err(|e| PipelineError::IoError(e.to_string()))?;
169 writer
170 .write_all(item_str.as_bytes())
171 .await
172 .map_err(|e| PipelineError::IoError(e.to_string()))?;
173 }
174
175 Ok(())
176}
177
178#[async_trait]
179impl<I: ScrapedItem> Pipeline<I> for StreamJsonPipeline<I> {
180 fn name(&self) -> &str {
181 "StreamJsonPipeline"
182 }
183
184 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
185 debug!("StreamJsonPipeline processing item.");
186 let json_value = map_item_for_export(&item, self.export_config.as_ref());
187
188 self.command_sender
189 .send(StreamJsonCommand::Write(json_value))
190 .await
191 .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
192
193 Ok(Some(item))
194 }
195
196 async fn close(&self) -> Result<(), PipelineError> {
197 info!("Closing StreamJsonPipeline.");
198 let (tx, rx) = kanal::bounded_async(1);
199 self.command_sender
200 .send(StreamJsonCommand::Shutdown(tx))
201 .await
202 .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
203
204 rx.recv().await.map_err(|e| {
205 PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
206 })?
207 }
208}