1use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use csv::Writer;
10use kanal::bounded_async;
11use log::{debug, error, info};
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use spider_util::error::PipelineError;
15use spider_util::item::ScrapedItem;
16use std::fs::{File, OpenOptions};
17use std::marker::PhantomData;
18use std::path::Path;
19
20#[derive(Serialize, Deserialize)]
21struct CsvState {
22 headers: Vec<String>,
23 #[serde(default)]
24 write_header_on_next_write: bool,
25}
26
27struct CsvWriterState {
28 writer: Writer<File>,
29 headers: Vec<String>,
30 write_header_on_next_write: bool,
31}
32
33enum CsvCommand {
34 Write {
35 item_value: Value,
36 responder: kanal::AsyncSender<Result<(), PipelineError>>,
37 },
38 GetState(kanal::AsyncSender<Result<Option<Value>, PipelineError>>),
39 RestoreState {
40 state: Value,
41 responder: kanal::AsyncSender<Result<(), PipelineError>>,
42 },
43 Shutdown(kanal::AsyncSender<()>),
44}
45
46pub struct CsvPipeline<I> {
49 command_sender: kanal::AsyncSender<CsvCommand>,
50 export_config: Option<SchemaExportConfig>,
51 _phantom: PhantomData<I>,
52}
53
54impl<I: ScrapedItem> CsvPipeline<I> {
55 const COMMAND_CHANNEL_CAPACITY: usize = 4096;
56 const FLUSH_EVERY_WRITES: usize = 250;
57
58 pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
64 spider_util::util::validate_output_dir(&file_path)
65 .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
66 let path_buf = file_path.as_ref().to_path_buf();
67 info!("Initializing CsvPipeline for file: {:?}", path_buf);
68
69 let (command_sender, command_receiver) =
70 bounded_async::<CsvCommand>(Self::COMMAND_CHANNEL_CAPACITY);
71 let path_clone = path_buf.clone();
72
73 tokio::task::spawn(async move {
74 let mut writer_state: Option<CsvWriterState> = None;
75 let mut pending_writes = 0usize;
76
77 info!("CSV async task started for file: {:?}", path_clone);
78
79 while let Ok(command) = command_receiver.recv().await {
80 match command {
81 CsvCommand::Write {
82 item_value,
83 responder,
84 } => {
85 let result = (|| {
86 if writer_state.is_none() {
87 let should_write_header = should_write_header(&path_clone)?;
88
89 let file = OpenOptions::new()
90 .create(true)
91 .append(true)
92 .open(&path_clone)?;
93
94 let writer = Writer::from_writer(file);
95 let headers = if let Some(map) = item_value.as_object() {
96 let mut h: Vec<String> = map.keys().cloned().collect();
97 h.sort();
98 h
99 } else {
100 return Err(PipelineError::ItemError(
101 "First item for CSV must be a JSON object".to_string(),
102 ));
103 };
104
105 writer_state = Some(CsvWriterState {
106 writer,
107 headers,
108 write_header_on_next_write: should_write_header,
109 });
110 }
111
112 let state = match writer_state.as_mut() {
113 Some(state) => state,
114 None => {
115 return Err(PipelineError::Other(
116 "CSV writer state missing unexpectedly".to_string(),
117 ));
118 }
119 };
120 if state.write_header_on_next_write {
121 state.writer.write_record(&state.headers)?;
122 state.write_header_on_next_write = false;
123 }
124 let record = if let Some(map) = item_value.as_object() {
125 state
126 .headers
127 .iter()
128 .map(|h| {
129 map.get(h)
130 .map(|v| {
131 if let Some(s) = v.as_str() {
132 s.to_string()
133 } else {
134 v.to_string()
135 }
136 })
137 .unwrap_or_default()
138 })
139 .collect::<Vec<String>>()
140 } else {
141 return Err(PipelineError::ItemError(
142 "Item for CSV must be a JSON object.".to_string(),
143 ));
144 };
145
146 state.writer.write_record(&record)?;
147 pending_writes += 1;
148 if pending_writes >= Self::FLUSH_EVERY_WRITES {
149 state.writer.flush()?;
150 pending_writes = 0;
151 }
152 Ok(())
153 })();
154
155 if responder.send(result).await.is_err() {
156 error!("Failed to send CSV write response.");
157 }
158 }
159 CsvCommand::GetState(responder) => {
160 let result = (|| {
161 if let Some(state) = &writer_state {
162 let state = CsvState {
163 headers: state.headers.clone(),
164 write_header_on_next_write: state.write_header_on_next_write,
165 };
166 let value = serde_json::to_value(state)?;
167 Ok(Some(value))
168 } else {
169 Ok(None)
170 }
171 })();
172 if responder.send(result).await.is_err() {
173 error!("Failed to send GetState response.");
174 }
175 }
176 CsvCommand::RestoreState { state, responder } => {
177 let result = (|| {
178 let state: CsvState = serde_json::from_value(state)?;
179 let file = OpenOptions::new()
180 .create(true)
181 .append(true)
182 .open(&path_clone)?;
183 let writer = Writer::from_writer(file);
184 writer_state = Some(CsvWriterState {
185 writer,
186 headers: state.headers,
187 write_header_on_next_write: state.write_header_on_next_write
188 || should_write_header(&path_clone)?,
189 });
190 info!("CSV Exporter state restored.");
191 Ok(())
192 })();
193 if responder.send(result).await.is_err() {
194 error!("Failed to send RestoreState response.");
195 }
196 }
197 CsvCommand::Shutdown(responder) => {
198 info!("CSV async task received shutdown command.");
199 if let Some(state) = writer_state.as_mut()
200 && let Err(e) = state.writer.flush()
201 {
202 error!("Failed to flush CSV writer on shutdown: {}", e);
203 }
204 let _ = responder.send(()).await;
205 break;
206 }
207 }
208 }
209 info!("CSV async task for file: {:?} finished.", path_clone);
210 });
211
212 Ok(CsvPipeline {
213 command_sender,
214 export_config: None,
215 _phantom: PhantomData,
216 })
217 }
218
219 pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
221 self.export_config = Some(config);
222 self
223 }
224}
225
226fn should_write_header(path: &Path) -> Result<bool, PipelineError> {
227 Ok(!path.exists() || path.metadata()?.len() == 0)
228}
229
230#[async_trait]
231impl<I: ScrapedItem> Pipeline<I> for CsvPipeline<I> {
232 fn name(&self) -> &str {
233 "CsvPipeline"
234 }
235
236 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
237 debug!("CsvPipeline processing item.");
238 let item_value = map_item_for_export(&item, self.export_config.as_ref());
239
240 let (tx, rx) = kanal::bounded_async(1);
241 self.command_sender
242 .send(CsvCommand::Write {
243 item_value,
244 responder: tx,
245 })
246 .await
247 .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
248
249 let result = rx.recv().await.map_err(|e| {
250 PipelineError::Other(format!("Failed to receive Write response: {}", e))
251 })?;
252 result?;
253
254 Ok(Some(item))
255 }
256
257 async fn close(&self) -> Result<(), PipelineError> {
258 info!("Closing CsvPipeline.");
259 let (tx, rx) = kanal::bounded_async(1);
260 self.command_sender
261 .send(CsvCommand::Shutdown(tx))
262 .await
263 .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
264 rx.recv().await.map_err(|e| {
265 PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
266 })?;
267 Ok(())
268 }
269
270 async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
271 let (tx, rx) = kanal::bounded_async(1);
272 self.command_sender
273 .send(CsvCommand::GetState(tx))
274 .await
275 .map_err(|e| PipelineError::Other(format!("Failed to send GetState command: {}", e)))?;
276 let result = rx.recv().await.map_err(|e| {
277 PipelineError::Other(format!("Failed to receive GetState response: {}", e))
278 })?;
279 Ok(result?)
280 }
281
282 async fn restore_state(&self, state: Value) -> Result<(), PipelineError> {
283 let (tx, rx) = kanal::bounded_async(1);
284 self.command_sender
285 .send(CsvCommand::RestoreState {
286 state,
287 responder: tx,
288 })
289 .await
290 .map_err(|e| {
291 PipelineError::Other(format!("Failed to send RestoreState command: {}", e))
292 })?;
293 let result = rx.recv().await.map_err(|e| {
294 PipelineError::Other(format!("Failed to receive RestoreState response: {}", e))
295 })?;
296 Ok(result?)
297 }
298}