1use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use log::{debug, info};
10use spider_util::{error::PipelineError, item::ScrapedItem};
11use std::fs::OpenOptions;
12use std::io::Write;
13use std::marker::PhantomData;
14use std::path::Path;
15use tokio::sync::{mpsc, oneshot};
16
17enum JsonlCommand {
18 Write {
19 serialized_item: String,
20 responder: oneshot::Sender<Result<(), PipelineError>>,
21 },
22 Shutdown(oneshot::Sender<Result<(), PipelineError>>),
23}
24
25pub struct JsonlPipeline<I: ScrapedItem> {
28 command_sender: mpsc::Sender<JsonlCommand>,
29 export_config: Option<SchemaExportConfig>,
30 _phantom: PhantomData<I>,
31}
32
33impl<I: ScrapedItem> JsonlPipeline<I> {
34 const COMMAND_CHANNEL_CAPACITY: usize = 1024;
35 const FLUSH_EVERY_WRITES: usize = 100;
36
37 pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
44 spider_util::util::validate_output_dir(&file_path)
45 .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
46 let path_buf = file_path.as_ref().to_path_buf();
47 info!("Initializing JsonlPipeline for file: {:?}", path_buf);
48
49 let (command_sender, mut command_receiver) =
50 mpsc::channel::<JsonlCommand>(Self::COMMAND_CHANNEL_CAPACITY);
51
52 tokio::task::spawn_blocking(move || {
53 let file_result = OpenOptions::new().create(true).append(true).open(&path_buf);
54 let mut file = match file_result {
55 Ok(file) => file,
56 Err(e) => {
57 while let Some(command) = command_receiver.blocking_recv() {
58 match command {
59 JsonlCommand::Write { responder, .. } => {
60 let _ = responder.send(Err(PipelineError::IoError(e.to_string())));
61 }
62 JsonlCommand::Shutdown(responder) => {
63 let _ = responder.send(Err(PipelineError::IoError(e.to_string())));
64 break;
65 }
66 }
67 }
68 return;
69 }
70 };
71
72 let mut pending_writes = 0usize;
73 while let Some(command) = command_receiver.blocking_recv() {
74 match command {
75 JsonlCommand::Write {
76 serialized_item,
77 responder,
78 } => {
79 let result = (|| -> Result<(), PipelineError> {
80 file.write_all(serialized_item.as_bytes())?;
81 file.write_all(b"\n")?;
82 pending_writes += 1;
83 if pending_writes >= Self::FLUSH_EVERY_WRITES {
84 file.flush()?;
85 pending_writes = 0;
86 }
87 Ok(())
88 })();
89 let _ = responder.send(result);
90 }
91 JsonlCommand::Shutdown(responder) => {
92 let result = file.flush().map_err(PipelineError::from);
93 let _ = responder.send(result);
94 break;
95 }
96 }
97 }
98 });
99
100 Ok(JsonlPipeline {
101 command_sender,
102 export_config: None,
103 _phantom: PhantomData,
104 })
105 }
106
107 pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
109 self.export_config = Some(config);
110 self
111 }
112}
113
114#[async_trait]
115impl<I: ScrapedItem> Pipeline<I> for JsonlPipeline<I> {
116 fn name(&self) -> &str {
117 "JsonlPipeline"
118 }
119
120 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
121 debug!("JsonlPipeline processing item.");
122 let json_value = map_item_for_export(&item, self.export_config.as_ref());
123 let serialized_item = serde_json::to_string(&json_value)?;
124
125 let (tx, rx) = oneshot::channel();
126 self.command_sender
127 .send(JsonlCommand::Write {
128 serialized_item,
129 responder: tx,
130 })
131 .await
132 .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
133 rx.await.map_err(|e| {
134 PipelineError::Other(format!("Failed to receive Write response: {}", e))
135 })??;
136
137 Ok(Some(item))
138 }
139
140 async fn close(&self) -> Result<(), PipelineError> {
141 let (tx, rx) = oneshot::channel();
142 self.command_sender
143 .send(JsonlCommand::Shutdown(tx))
144 .await
145 .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
146 rx.await.map_err(|e| {
147 PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
148 })?
149 }
150}