Engineering

Real-Time Data Processing with Redis Streams

Discover real-time data processing with Redis Streams. Learn how MisuJob uses it to handle 1M+ job listings, building a scalable, low-latency AI-powered platform.

· Founder & Engineer · · 8 min read
Diagram illustrating the flow of real-time data processing using Redis Streams at MisuJob.

Real-time data streams are the backbone of modern, responsive applications, allowing us to react instantly to changes and provide a seamless user experience. At MisuJob, processing 1M+ job listings requires robust, scalable, and low-latency infrastructure, and Redis Streams has become a critical component of our real-time data pipeline.

Introduction to Real-Time Data Processing with Redis Streams

At MisuJob, we’re building an AI-powered job matching platform to connect professionals across Europe with their dream opportunities. This ambition translates to processing massive amounts of information in real time. Redis Streams is a powerful, append-only data structure that provides a robust and efficient way to handle real-time data ingestion, processing, and distribution. It allows us to build highly scalable and fault-tolerant applications, enabling us to react to new job postings, update salary insights, and refine our AI models with minimal delay. Before Streams, we experimented with other solutions like Kafka and RabbitMQ. While powerful, their operational overhead and configuration complexity proved to be a challenge for our team. Redis Streams offered a simpler, more integrated solution within our existing Redis infrastructure, significantly reducing our operational burden.

What are Redis Streams?

Redis Streams is a data structure that allows you to store a continuous flow of data, similar to a log file. Each entry in the stream is identified by a unique ID and contains one or more key-value pairs. Key features of Redis Streams include:

  • Persistence: Streams are persisted to disk, ensuring data durability.
  • Consumer Groups: Streams support consumer groups, allowing multiple consumers to work together to process the stream in parallel.
  • Blocking Reads: Consumers can block and wait for new messages to arrive in the stream.
  • Range Queries: You can query the stream for messages within a specific range of IDs.
  • Auto-ID generation: Redis automatically creates monotonically increasing IDs for each stream entry.

Why Redis Streams?

We chose Redis Streams for several key reasons:

  • Performance: Redis is known for its speed and low latency. Streams inherits these characteristics, making it ideal for real-time processing.
  • Simplicity: Redis Streams is relatively easy to set up and use compared to more complex messaging systems. This meant a faster development cycle and reduced operational overhead.
  • Integration: We already use Redis extensively for caching and session management. Integrating Streams into our existing infrastructure was straightforward.
  • Scalability: Streams can be scaled horizontally by adding more Redis instances. We can effectively distribute the load as MisuJob grows and processes more job listings.
  • Lower Latency: Our benchmarks showed that Redis Streams consistently offered lower end-to-end latency compared to other message queue solutions, especially under high load. This is critical for our real-time features.

Implementing Real-Time Data Processing with Redis Streams

Let’s walk through a simplified example of how we use Redis Streams at MisuJob. Imagine we want to track the number of job applications received for each job listing in real-time.

Setting Up the Stream

First, we need to create a stream in Redis:

redis-cli XADD job_applications * job_id 12345 user_id 67890 timestamp 1678886400

This command adds a new entry to the job_applications stream. The * tells Redis to automatically generate a unique ID for the entry. The key-value pairs represent the data associated with the application (job ID, user ID, and timestamp).

Consuming from the Stream

Next, we need a consumer to process the stream. We can use consumer groups to allow multiple consumers to work together. First, create a consumer group:

redis-cli XGROUP CREATE job_applications mygroup 0

This command creates a consumer group named mygroup for the job_applications stream. The 0 specifies that the group should start consuming from the beginning of the stream.

Now, we can write a consumer application to read from the stream:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

while True:
    # Read messages from the 'mygroup' consumer group, waiting for new messages if the stream is empty.
    response = r.xreadgroup(groupname='mygroup', consumername='consumer1', streams={'job_applications': '>'}, count=1, block=1000)

    if response:
        stream_name, messages = response[0]
        for message_id, message_data in messages:
            job_id = message_data[b'job_id'].decode('utf-8')
            user_id = message_data[b'user_id'].decode('utf-8')
            timestamp = message_data[b'timestamp'].decode('utf-8')

            print(f"Received application for job {job_id} from user {user_id} at {timestamp}")

            # Acknowledge the message to remove it from the pending entries list (PEL).
            r.xack(name='job_applications', groupname='mygroup', id=message_id)

This Python code connects to Redis, reads messages from the job_applications stream using the mygroup consumer group, processes the data, and acknowledges the message. Acknowledging the message is crucial. If the consumer fails to acknowledge the message, it will remain in the Pending Entries List (PEL) and will be re-delivered to another consumer in the group.

Aggregating Data

After consuming the stream data, we can aggregate it to get real-time insights. For instance, we can use Redis to store the number of applications per job:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

while True:
    # Read messages from the 'mygroup' consumer group, waiting for new messages if the stream is empty.
    response = r.xreadgroup(groupname='mygroup', consumername='consumer1', streams={'job_applications': '>'}, count=1, block=1000)

    if response:
        stream_name, messages = response[0]
        for message_id, message_data in messages:
            job_id = message_data[b'job_id'].decode('utf-8')

            # Increment the application count for the job ID.
            r.incr(f"job:{job_id}:applications")

            print(f"Incremented application count for job {job_id}")

            # Acknowledge the message to remove it from the pending entries list (PEL).
            r.xack(name='job_applications', groupname='mygroup', id=message_id)

This code increments a Redis counter for each job application received. We can then query these counters to get real-time application counts.

Scaling and Performance Optimization

Sharding Streams

To handle the massive volume of job listings that MisuJob processes, we shard our streams across multiple Redis instances. We use a consistent hashing algorithm to ensure that messages for the same job listing are always routed to the same shard. This approach allows us to distribute the load evenly and scale horizontally as our data volume grows.

Optimizing Consumer Performance

We’ve learned a few crucial tips for optimizing consumer performance.

  • Batch Processing: Instead of processing messages one at a time, we fetch and process them in batches. This reduces the overhead of network round trips and improves throughput.
  • Parallel Processing: We use multiple threads or processes to consume from the stream in parallel. This allows us to fully utilize our CPU cores and further increase throughput.
  • Connection Pooling: We use connection pooling to reuse Redis connections, reducing the overhead of creating new connections for each request.
  • Non-Blocking Operations: Where possible, we use non-blocking Redis operations to avoid blocking the main thread of our consumer application.

Monitoring and Alerting

We use Prometheus and Grafana to monitor the performance of our Redis Streams infrastructure. We track key metrics such as:

  • Stream Length: The number of messages in the stream.
  • Consumer Lag: The difference between the last message produced and the last message consumed.
  • Message Processing Time: The time it takes to process a single message.
  • Redis CPU and Memory Usage: Overall resource utilization of the Redis instances.

We set up alerts to notify us of any anomalies or performance issues. For example, we have alerts that trigger if the consumer lag exceeds a certain threshold or if the message processing time increases significantly.

Real-World Applications at MisuJob

Here are a few specific examples of how we use Redis Streams at MisuJob:

  • Real-Time Salary Updates: We continuously update salary insights based on the latest job postings. When a new job listing with salary information is processed, we push it to a stream. Consumers process this data and update our salary database in real-time. This ensures that our users always have access to the most up-to-date salary information.
  • Personalized Job Recommendations: We use Redis Streams to power our real-time job recommendation engine. When a user performs a search or updates their profile, we push this data to a stream. Consumers process this data and update the user’s recommendations in real-time. This allows us to provide highly personalized and relevant job recommendations.
  • Fraud Detection: We use Redis Streams to detect and prevent fraudulent activity on our platform. For example, we track the number of applications submitted from a single IP address. If the number exceeds a certain threshold, we flag the IP address for further investigation.
  • Aggregated Job Market Insights: We aggregate job market data in real-time, providing users with insights into the most in-demand skills, industries, and locations. This data is used to power our career advice features, helping job seekers make informed decisions.

Salary Insights Example

Our real-time salary processing allows us to provide granular salary data across Europe. For example, a Software Engineer in various European cities can expect the following (approximate, based on MisuJob data):

CityJunior (EUR)Mid-Level (EUR)Senior (EUR)
Berlin50,00075,000100,000
Amsterdam55,00080,000110,000
Paris48,00072,00095,000
London60,00090,000120,000
Stockholm52,00078,000105,000

This data is continuously updated via Redis Streams, ensuring accuracy and relevance.

Challenges and Lessons Learned

Implementing real-time data processing with Redis Streams wasn’t without its challenges.

  • Consumer Lag: Ensuring that consumers can keep up with the rate of message production is crucial. We’ve had to fine-tune our consumer applications and scale our Redis infrastructure to address this issue.
  • Message Ordering: Redis Streams guarantees message ordering within a partition, but not across partitions. If message ordering is critical for your application, you need to ensure that related messages are routed to the same partition.
  • Error Handling: Implementing robust error handling is essential. We use dead-letter queues to handle messages that cannot be processed successfully. We also have monitoring and alerting in place to notify us of any errors.
  • Data Serialization: Choosing the right data serialization format is important for performance. We experimented with different formats and found that Protocol Buffers (protobuf) offered the best balance of performance and efficiency.

Conclusion

Redis Streams has proven to be a valuable tool for building real-time data pipelines at MisuJob. Its simplicity, performance, and scalability make it an excellent choice for many use cases. By understanding the key concepts and best practices, you can leverage Redis Streams to build responsive, scalable, and fault-tolerant applications.

Key Takeaways

  • Redis Streams is a powerful and efficient way to handle real-time data.
  • Consumer groups allow you to process streams in parallel.
  • Monitoring and alerting are essential for maintaining the health of your Redis Streams infrastructure.
  • Consider sharding your streams to handle large volumes of data.
  • Optimize consumer performance with batch processing, parallel processing, and connection pooling.
  • Continuously monitor consumer lag and adjust resources as needed.
redis streams real-time data data processing misuJob ai
Share
P
Pablo Inigo

Founder & Engineer

Building MisuJob - an AI-powered job matching platform processing 1M+ job listings daily.

Engineering updates

Technical deep dives delivered to your inbox.

Find your next role with AI

Upload your CV. Get matched to 50,000+ jobs. Apply to the best fits effortlessly.

Get Started Free

User

Dashboard Profile Subscription