spider_pipeline/
json.rs

1//! JSON array output pipeline.
2//!
3//! [`JsonPipeline`] keeps items in memory and writes a pretty-printed JSON
4//! array when the pipeline is closed.
5
6use crate::pipeline::Pipeline;
7use crate::schema::{SchemaExportConfig, map_item_for_export};
8use async_trait::async_trait;
9use kanal::bounded_async;
10use log::{debug, error, info};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use spider_util::error::PipelineError;
14use spider_util::item::ScrapedItem;
15use std::fs::File;
16use std::io::Write;
17use std::marker::PhantomData;
18use std::path::Path;
19
20#[derive(Serialize, Deserialize)]
21struct JsonState {
22    items: Vec<Value>,
23}
24
25enum JsonCommand {
26    Write(Value),
27    GetState(kanal::AsyncSender<Result<Option<Value>, PipelineError>>),
28    RestoreState {
29        state: Value,
30        responder: kanal::AsyncSender<Result<(), PipelineError>>,
31    },
32    Shutdown(kanal::AsyncSender<Result<(), PipelineError>>),
33}
34
35/// A pipeline that writes all scraped items to a single JSON file as a JSON array.
36/// Items are collected in a blocking task and written to disk when the pipeline is closed.
37pub struct JsonPipeline<I: ScrapedItem> {
38    command_sender: kanal::AsyncSender<JsonCommand>,
39    export_config: Option<SchemaExportConfig>,
40    _phantom: PhantomData<I>,
41}
42
43impl<I: ScrapedItem> JsonPipeline<I> {
44    const COMMAND_CHANNEL_CAPACITY: usize = 1024;
45
46    /// Creates a new `JsonPipeline`.
47    ///
48    /// # Errors
49    ///
50    /// Returns an error when the output directory cannot be created.
51    pub fn new(file_path: impl AsRef<Path>) -> Result<Self, PipelineError> {
52        spider_util::util::validate_output_dir(&file_path)
53            .map_err(|e: spider_util::error::SpiderError| PipelineError::Other(e.to_string()))?;
54        let (command_sender, command_receiver) =
55            bounded_async::<JsonCommand>(Self::COMMAND_CHANNEL_CAPACITY);
56        let path_buf = file_path.as_ref().to_path_buf();
57
58        tokio::task::spawn(async move {
59            let mut items: Vec<Value> = Vec::new();
60            info!("JsonPipeline async task started for file: {:?}", path_buf);
61
62            while let Ok(command) = command_receiver.recv().await {
63                match command {
64                    JsonCommand::Write(value) => {
65                        items.push(value);
66                    }
67                    JsonCommand::GetState(responder) => {
68                        let result = (|| {
69                            if items.is_empty() {
70                                return Ok(None);
71                            }
72                            let state = JsonState {
73                                items: items.clone(),
74                            };
75                            let value = serde_json::to_value(state)?;
76                            Ok(Some(value))
77                        })();
78                        if responder.send(result).await.is_err() {
79                            error!("Failed to send GetState response.");
80                        }
81                    }
82                    JsonCommand::RestoreState { state, responder } => {
83                        let result = (|| {
84                            let state: JsonState = serde_json::from_value(state)?;
85                            items = state.items;
86                            info!("JsonPipeline state restored with {} items.", items.len());
87                            Ok(())
88                        })();
89                        if responder.send(result).await.is_err() {
90                            error!("Failed to send RestoreState response.");
91                        }
92                    }
93                    JsonCommand::Shutdown(responder) => {
94                        info!("JsonPipeline writing {} items to file.", items.len());
95                        let result = (|| {
96                            let mut file = File::create(&path_buf)?;
97                            let json_array = Value::Array(items);
98                            let json_string = serde_json::to_string_pretty(&json_array)?;
99                            file.write_all(json_string.as_bytes())?;
100                            Ok(())
101                        })();
102                        if responder.send(result).await.is_err() {
103                            error!("Failed to send JsonPipeline shutdown response.");
104                        }
105                        break;
106                    }
107                }
108            }
109            info!("JsonPipeline async task for file: {:?} finished.", path_buf);
110        });
111
112        Ok(JsonPipeline {
113            command_sender,
114            export_config: None,
115            _phantom: PhantomData,
116        })
117    }
118
119    /// Applies typed export mapping before values are written.
120    pub fn with_schema_export_config(mut self, config: SchemaExportConfig) -> Self {
121        self.export_config = Some(config);
122        self
123    }
124}
125
126#[async_trait]
127impl<I: ScrapedItem> Pipeline<I> for JsonPipeline<I> {
128    fn name(&self) -> &str {
129        "JsonPipeline"
130    }
131
132    async fn process_item(&self, item: I) -> Result<Option<I>, PipelineError> {
133        debug!("JsonPipeline processing item.");
134        let json_value = map_item_for_export(&item, self.export_config.as_ref());
135        self.command_sender
136            .send(JsonCommand::Write(json_value))
137            .await
138            .map_err(|e| PipelineError::Other(format!("Failed to send Write command: {}", e)))?;
139        Ok(Some(item))
140    }
141
142    async fn close(&self) -> Result<(), PipelineError> {
143        info!("Closing JsonPipeline.");
144        let (tx, rx) = kanal::bounded_async(1);
145        self.command_sender
146            .send(JsonCommand::Shutdown(tx))
147            .await
148            .map_err(|e| PipelineError::Other(format!("Failed to send Shutdown command: {}", e)))?;
149        rx.recv().await.map_err(|e| {
150            PipelineError::Other(format!("Failed to receive shutdown response: {}", e))
151        })?
152    }
153
154    async fn get_state(&self) -> Result<Option<Value>, PipelineError> {
155        let (tx, rx) = kanal::bounded_async(1);
156        self.command_sender
157            .send(JsonCommand::GetState(tx))
158            .await
159            .map_err(|e| PipelineError::Other(format!("Failed to send GetState command: {}", e)))?;
160        rx.recv().await.map_err(|e| {
161            PipelineError::Other(format!("Failed to receive GetState response: {}", e))
162        })?
163    }
164
165    async fn restore_state(&self, state: Value) -> Result<(), PipelineError> {
166        let (tx, rx) = kanal::bounded_async(1);
167        self.command_sender
168            .send(JsonCommand::RestoreState {
169                state,
170                responder: tx,
171            })
172            .await
173            .map_err(|e| {
174                PipelineError::Other(format!("Failed to send RestoreState command: {}", e))
175            })?;
176        rx.recv().await.map_err(|e| {
177            PipelineError::Other(format!("Failed to receive RestoreState response: {}", e))
178        })?
179    }
180}