Engineering

Batch Processing 100K Documents Daily with Node.js Streams

Learn how MisuJob efficiently processes 100K documents daily using Node.js streams for scalable data handling. Optimize your data pipelines now!

· Founder & Engineer · · 10 min read
Diagram illustrating Node.js streams efficiently processing a large batch of documents for MisuJob.

Processing large datasets efficiently is a core challenge for any data-driven platform. At MisuJob, we face this challenge daily, as our systems process 1M+ job listings aggregated from multiple sources to power our AI-powered job matching.

Batch Processing 100K Documents Daily with Node.js Streams

Handling this volume of data requires robust and scalable solutions. One of the key technologies we leverage is Node.js streams. Node.js streams provide an elegant and efficient way to process large files or data streams without loading the entire dataset into memory. This is particularly critical when dealing with hundreds of thousands of documents daily. We’ll share our experiences and best practices for effectively using Node.js streams in a batch processing environment.

The Challenge: Processing Massive Datasets

Our platform processes 1M+ job listings daily, requiring us to perform various operations on each document, including data cleaning, transformation, and indexing. Initially, we attempted to load entire files into memory for processing. This approach quickly proved unsustainable due to memory limitations and performance bottlenecks. For example, processing a single file containing 50,000 job listings could consume several gigabytes of memory, leading to frequent garbage collection pauses and increased latency.

We needed a solution that would allow us to process each document incrementally, minimizing memory footprint and maximizing throughput. Node.js streams provided the answer.

Introduction to Node.js Streams

Node.js streams are an abstraction for handling streaming data. They allow you to read data from a source, process it, and write it to a destination in a non-blocking manner. There are four fundamental types of streams:

  • Readable: Streams that you can read from (e.g., reading data from a file).
  • Writable: Streams that you can write to (e.g., writing data to a file).
  • Duplex: Streams that are both readable and writable.
  • Transform: Duplex streams that modify or transform the data as it passes through.

For our batch processing pipeline, we primarily use readable streams to read documents from storage and transform streams to process and manipulate the data.

Implementing a Streaming Pipeline

Our streaming pipeline consists of several stages, each responsible for a specific task. Let’s illustrate with a simplified example. Imagine we need to read a large JSON file containing job listings, filter listings based on location, and transform the data into a specific format for indexing.

Here’s how we can implement this using Node.js streams:

  1. Reading the JSON file: We use fs.createReadStream to create a readable stream from the JSON file.
  2. Parsing the JSON data: We use a transform stream to parse each chunk of data from the file.
  3. Filtering job listings: We use another transform stream to filter listings based on location.
  4. Transforming the data: We use a final transform stream to transform the data into the desired format.
  5. Writing the transformed data: We use a writable stream to write the transformed data to a destination (e.g., another file or a database).

Here’s a code example showcasing this pipeline:

const fs = require('fs');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');

async function processJobListings(inputFilePath, outputFilePath, targetLocation) {
  const readStream = fs.createReadStream(inputFilePath, { encoding: 'utf8' });
  const writeStream = fs.createWriteStream(outputFilePath, { encoding: 'utf8' });

  const parseJsonTransform = new Transform({
    transform(chunk, encoding, callback) {
      try {
        const data = JSON.parse(chunk);
        // Assuming the JSON file contains an array of job listings
        if (Array.isArray(data)) {
          data.forEach(listing => this.push(JSON.stringify(listing) + '\n')); // Ensure each listing is on a new line
        } else {
          console.warn("Unexpected JSON structure. Expected an array.");
        }
        callback();
      } catch (error) {
        callback(error);
      }
    }
  });

  const filterListingsTransform = new Transform({
    transform(chunk, encoding, callback) {
      const listing = JSON.parse(chunk.toString());
      if (listing.location === targetLocation) {
        this.push(JSON.stringify(listing) + '\n');
      }
      callback();
    }
  });

  const transformDataTransform = new Transform({
    transform(chunk, encoding, callback) {
      const listing = JSON.parse(chunk.toString());
      const transformedListing = {
        title: listing.title,
        company: listing.company,
        location: listing.location,
        // Add other fields as needed
      };
      this.push(JSON.stringify(transformedListing) + '\n');
      callback();
    }
  });

  try {
    await pipeline(
      readStream,
      parseJsonTransform,
      filterListingsTransform,
      transformDataTransform,
      writeStream
    );
    console.log('Processing complete.');
  } catch (err) {
    console.error('Pipeline failed.', err);
  }
}

// Example usage
const inputFilePath = 'job_listings.json';
const outputFilePath = 'filtered_job_listings.json';
const targetLocation = 'Berlin';

processJobListings(inputFilePath, outputFilePath, targetLocation);

This example demonstrates the basic structure of a streaming pipeline. We use pipeline from stream/promises to connect the streams together and handle errors gracefully. This is a more robust approach than manually piping streams together because it handles backpressure and error propagation automatically.

Optimizing Stream Performance

While Node.js streams provide a powerful way to process large datasets, optimizing their performance is crucial for achieving maximum throughput. Here are some techniques we use to optimize our streaming pipelines:

  • Buffering: Buffering data in streams can improve performance by reducing the number of read/write operations. You can control the buffer size using the highWaterMark option when creating streams.
  • Parallel Processing: For CPU-intensive tasks, you can parallelize the processing by using worker threads or child processes. This allows you to leverage multiple CPU cores to speed up the processing. We often use a pool of worker threads to handle data transformation in parallel, significantly reducing the overall processing time.
  • Compression: Compressing data before writing it to storage can reduce storage costs and improve I/O performance. We use Gzip compression to compress large JSON files before storing them, which can reduce their size by up to 80%.
  • Error Handling: Implement robust error handling to prevent pipeline failures. Use try...catch blocks to catch errors in your transform streams and handle them appropriately. We also use error logging and monitoring to track errors and identify potential issues.

Backpressure Management

Backpressure is a critical concept in stream processing. It occurs when a stream is producing data faster than it can be consumed. If not handled properly, backpressure can lead to memory exhaustion and pipeline failures.

Node.js streams provide built-in mechanisms for handling backpressure. When a writable stream is full, it emits a 'drain' event to signal that it is ready to receive more data. Readable streams also respect backpressure by pausing data production when the consumer is not ready. The pipeline function automatically handles backpressure, making it easier to build robust streaming pipelines.

Real-World Example: Salary Data Processing

At MisuJob, we use streams extensively to process salary data from various sources. This data is crucial for providing users with accurate salary insights and helping them make informed career decisions.

Let’s say we have a large CSV file containing salary data for different job titles and locations across Europe. We want to process this data to calculate the average salary for each job title in each location.

First, we read the CSV file using fs.createReadStream and parse it using a CSV parsing library like csv-parser. Then, we use a transform stream to group the salary data by job title and location. Finally, we use another transform stream to calculate the average salary for each group and write the results to a database.

Here’s a simplified example of how we might implement this:

const fs = require('fs');
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const csv = require('csv-parser');

async function processSalaryData(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath);

  const groupSalaryDataTransform = new Transform({
    transform(chunk, encoding, callback) {
      const row = JSON.parse(chunk.toString());
      const { jobTitle, location, salary } = row;

      if (!this.salaryData) {
        this.salaryData = {};
      }

      const key = `${jobTitle}-${location}`;
      if (!this.salaryData[key]) {
        this.salaryData[key] = {
          totalSalary: 0,
          count: 0,
        };
      }

      this.salaryData[key].totalSalary += parseInt(salary, 10);
      this.salaryData[key].count++;
      callback();
    },
    flush(callback) {
      for (const key in this.salaryData) {
        const { totalSalary, count } = this.salaryData[key];
        const averageSalary = totalSalary / count;
        const [jobTitle, location] = key.split('-');
        const result = {
          jobTitle,
          location,
          averageSalary: averageSalary.toFixed(2),
        };
        this.push(JSON.stringify(result) + '\n');
      }
      callback();
    },
  });

  try {
    const results = [];
    await pipeline(
      readStream.pipe(csv()),
      groupSalaryDataTransform,
      async function* (source) {
        for await (const chunk of source) {
          results.push(JSON.parse(chunk));
        }
      }
    );

    // results now contains the average salaries
    console.log(results);

    // Example usage: Displaying average salaries for Software Engineers
    const softwareEngineerSalaries = results.filter(r => r.jobTitle === "Software Engineer");
    console.log("Average Salaries for Software Engineers:");
    console.table(softwareEngineerSalaries);


    console.log('Processing complete.');
  } catch (err) {
    console.error('Pipeline failed.', err);
  }
}

// Example usage
const inputFilePath = 'salary_data.csv';
processSalaryData(inputFilePath);

This example demonstrates how to use streams to process CSV data and calculate aggregate statistics. The flush method of the transform stream is used to process the remaining data after the stream has finished.

Salary Insights: A European Comparison

The insights gained from this salary data processing are invaluable to our users. Here’s a sample of average salaries for Software Engineers across various European countries, based on our processed data:

CountryAverage Salary (EUR)
Germany75,000
United Kingdom70,000
Netherlands72,000
France65,000
Switzerland90,000
Spain50,000
Sweden78,000

This data allows job seekers to understand salary expectations in different locations and make informed decisions about their career paths. We provide more granular salary breakdowns by experience level, specific technologies, and company size, all powered by our large-scale data processing pipelines. This kind of detailed analysis is only possible because of our efficient batch processing infrastructure.

Monitoring and Observability

Monitoring and observability are essential for ensuring the reliability and performance of our batch processing pipelines. We use a combination of tools and techniques to monitor our pipelines, including:

  • Metrics: We collect metrics such as processing time, memory usage, and error rates. These metrics are used to track the performance of our pipelines and identify potential issues. We use Prometheus to collect and store these metrics and Grafana to visualize them.
  • Logging: We log detailed information about the processing of each document, including timestamps, input data, and output data. This logging data is used to troubleshoot issues and understand the behavior of our pipelines. We use a centralized logging system based on Elasticsearch and Kibana to manage and analyze our logs.
  • Alerting: We set up alerts to notify us when certain metrics exceed predefined thresholds. For example, we have alerts that trigger when the processing time of a pipeline exceeds a certain limit or when the error rate increases significantly. These alerts allow us to respond quickly to potential issues and prevent pipeline failures.

Query Optimizations for Data Analysis

After processing and transforming the data, we often need to perform complex queries to analyze it and generate insights. Optimizing these queries is crucial for ensuring that they execute quickly and efficiently.

For example, let’s say we want to find the average salary for Software Engineers in Berlin with more than 5 years of experience. We might write a query like this:

SELECT AVG(salary)
FROM job_listings
WHERE job_title = 'Software Engineer'
AND location = 'Berlin'
AND experience > 5;

To optimize this query, we can use indexing. Indexing the job_title, location, and experience columns can significantly speed up the query by allowing the database to quickly locate the relevant rows.

Here’s an example of how to create these indexes:

CREATE INDEX idx_job_title ON job_listings (job_title);
CREATE INDEX idx_location ON job_listings (location);
CREATE INDEX idx_experience ON job_listings (experience);

By adding these indexes, we can reduce the query execution time from several seconds to milliseconds. We continuously monitor query performance and optimize our indexes to ensure that our data analysis queries are running as efficiently as possible.

Conclusion

Node.js streams provide a powerful and efficient way to process large datasets in a batch processing environment. By leveraging streams, we can minimize memory footprint, maximize throughput, and build robust and scalable data processing pipelines. At MisuJob, we rely heavily on Node.js streams to process 1M+ job listings daily, powering our AI-powered job matching and providing valuable insights to our users. Remember to optimize stream performance through buffering, parallel processing, compression, and robust error handling. Proper backpressure management is also essential for preventing memory exhaustion and pipeline failures.

Key Takeaways:

  • Node.js streams are essential for efficient batch processing of large datasets.
  • Optimize stream performance using buffering, parallel processing, and compression.
  • Handle backpressure to prevent memory exhaustion and pipeline failures.
  • Implement robust error handling and monitoring for reliable pipelines.
  • Use indexing to optimize data analysis queries.
node.js streams batch processing data processing scalability
Share
P
Pablo Inigo

Founder & Engineer

Building MisuJob - an AI-powered job matching platform processing 1M+ job listings daily.

Engineering updates

Technical deep dives delivered to your inbox.

Find your next role with AI

Upload your CV. Get matched to 50,000+ jobs. Apply to the best fits effortlessly.

Get Started Free

User

Dashboard Profile Subscription