1use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use kanal::bounded_async;
10use log::{debug, error, info};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use spider_util::error::PipelineError;
14use spider_util::item::ScrapedItem;
15use std::fs::File;
16use std::io::Write;
17use std::marker::PhantomData;
18use std::path::Path;
19
20#[derive(Serialize, Deserialize)]
21struct JsonState {
22 items: Vec<Value>,
23}
24
25enum JsonCommand {
26 Write(Value),
27 GetState(kanal::AsyncSender<Result<Option<Value>, PipelineError>>),
28 RestoreState {
29 state: Value,
30 responder: kanal::AsyncSender<Result<(), PipelineError>>,
31 },
32 Shutdown(kanal::AsyncSender<Result<(), PipelineError>>),
33}
34
35pub struct JsonPipeline<I: ScrapedItem> {
38 command_sender: kanal::AsyncSender<JsonCommand>,
39 export_config: Option<SchemaExportConfig>,
40 _phantom: PhantomData<I>,
41}
42
43impl<I: ScrapedItem> JsonPipeline<I> {
44 const COMMAND_CHANNEL_CAPACITY: usize = 1024;
45
46 pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
52 spider_util::util::validate_output_dir(&file_path)
53 .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
54 let (command_sender, command_receiver) =
55 bounded_async::<JsonCommand>(Self::COMMAND_CHANNEL_CAPACITY);
56 let path_buf = file_path.as_ref().to_path_buf();
57
58 tokio::task::spawn(async move {
59 let mut items: Vec<Value> = Vec::new();
60 info!("JsonPipeline async task started for file: {:?}", path_buf);
61
62 while let Ok(command) = command_receiver.recv().await {
63 match command {
64 JsonCommand::Write(value) => {
65 items.push(value);
66 }
67 JsonCommand::GetState(responder) => {
68 let result = (|| {
69 if items.is_empty() {
70 return Ok(None);
71 }
72 let state = JsonState {
73 items: items.clone(),
74 };
75 let value = serde_json::to_value(state)?;
76 Ok(Some(value))
77 })();
78 if responder.send(result).await.is_err() {
79 error!("Failed to send GetState response.");
80 }
81 }
82 JsonCommand::RestoreState { state, responder } => {
83 let result = (|| {
84 let state: JsonState = serde_json::from_value(state)?;
85 items = state.items;
86 info!("JsonPipeline state restored with {} items.", items.len());
87 Ok(())
88 })();
89 if responder.send(result).await.is_err() {
90 error!("Failed to send RestoreState response.");
91 }
92 }
93 JsonCommand::Shutdown(responder) => {
94 info!("JsonPipeline writing {} items to file.", items.len());
95 let result = (|| {
96 let mut file = File::create(&path_buf)?;
97 let json_array = Value::Array(items);
98 let json_string = serde_json::to_string_pretty(&json_array)?;
99 file.write_all(json_string.as_bytes())?;
100 Ok(())
101 })();
102 if responder.send(result).await.is_err() {
103 error!("Failed to send JsonPipeline shutdown response.");
104 }
105 break;
106 }
107 }
108 }
109 info!("JsonPipeline async task for file: {:?} finished.", path_buf);
110 });
111
112 Ok(JsonPipeline {
113 command_sender,
114 export_config: None,
115 _phantom: PhantomData,
116 })
117 }
118
119 pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
121 self.export_config = Some(config);
122 self
123 }
124}
125
126#[async_trait]
127impl<I: ScrapedItem> Pipeline<I> for JsonPipeline<I> {
128 fn name(&self) -> &str {
129 "JsonPipeline"
130 }
131
132 async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
133 debug!("JsonPipeline processing item.");
134 let json_value = map_item_for_export(&item, self.export_config.as_ref());
135 self.command_sender
136 .send(JsonCommand::Write(json_value))
137 .await
138 .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
139 Ok(Some(item))
140 }
141
142 async fn close(&self) -> Result<(), PipelineError> {
143 info!("Closing JsonPipeline.");
144 let (tx, rx) = kanal::bounded_async(1);
145 self.command_sender
146 .send(JsonCommand::Shutdown(tx))
147 .await
148 .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
149 rx.recv().await.map_err(|e| {
150 PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
151 })?
152 }
153
154 async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
155 let (tx, rx) = kanal::bounded_async(1);
156 self.command_sender
157 .send(JsonCommand::GetState(tx))
158 .await
159 .map_err(|e| PipelineError::Other(format!("Failed to send GetState command: {}", e)))?;
160 rx.recv().await.map_err(|e| {
161 PipelineError::Other(format!("Failed to receive GetState response: {}", e))
162 })?
163 }
164
165 async fn restore_state(&self, state: Value) -> Result<(), PipelineError> {
166 let (tx, rx) = kanal::bounded_async(1);
167 self.command_sender
168 .send(JsonCommand::RestoreState {
169 state,
170 responder: tx,
171 })
172 .await
173 .map_err(|e| {
174 PipelineError::Other(format!("Failed to send RestoreState command: {}", e))
175 })?;
176 rx.recv().await.map_err(|e| {
177 PipelineError::Other(format!("Failed to receive RestoreState response: {}", e))
178 })?
179 }
180}