Stream Processing with RedisKit
Introduction to Redis Streams
Redis Streams provide a powerful append-only log data structure with consumer groups support, perfect for building event-driven architectures and real-time data processing pipelines.
Basic Stream Operations
Adding to Streams
public class EventPublisher
{
private readonly IRedisStreamService _streams;
public EventPublisher(IRedisStreamService streams)
{
_streams = streams;
}
public async Task<string> PublishEventAsync(UserEvent userEvent)
{
// Add to stream with auto-generated ID
var messageId = await _streams.AddAsync("events:stream", userEvent);
Console.WriteLine($"Published event with ID: {messageId}");
return messageId;
}
public async Task<string> PublishWithCustomIdAsync(UserEvent userEvent, string customId)
{
// Add with custom ID (must be greater than last ID)
return await _streams.AddAsync("events:stream", userEvent, customId);
}
}
Reading from Streams
public class StreamReader
{
private readonly IRedisStreamService _streams;
public async Task ReadLatestAsync(int count = 10)
{
// Read last N messages
var messages = await _streams.ReadAsync<UserEvent>(
"events:stream",
"-", // Start from beginning
count);
foreach (var (id, evt) in messages)
{
Console.WriteLine($"ID: {id}, Event: {evt.Type}");
}
}
public async Task ReadRangeAsync(string startId, string endId)
{
// Read specific range
var messages = await _streams.RangeAsync<UserEvent>(
"events:stream",
startId,
endId);
ProcessMessages(messages);
}
}
Consumer Groups
Creating Consumer Groups
public class StreamConsumerSetup
{
private readonly IRedisStreamService _streams;
public async Task SetupConsumerGroupAsync()
{
// Create consumer group starting from beginning
await _streams.CreateConsumerGroupAsync(
"events:stream",
"processing-group",
"$"); // Start from new messages only
// Or start from beginning
await _streams.CreateConsumerGroupAsync(
"events:stream",
"analytics-group",
"0"); // Process all messages
}
}
Consuming with Groups
public class StreamConsumer
{
private readonly IRedisStreamService _streams;
private readonly string _consumerName;
public StreamConsumer(IRedisStreamService streams, string consumerName)
{
_streams = streams;
_consumerName = consumerName;
}
public async Task StartConsumingAsync()
{
await _streams.ConsumeAsync<OrderEvent>(
streamKey: "orders:stream",
groupName: "order-processors",
consumerName: _consumerName,
handler: async (order, streamEntry) =>
{
try
{
Console.WriteLine($"Processing order: {order.OrderId}");
await ProcessOrder(order);
// Acknowledge message
await _streams.AcknowledgeAsync(
"orders:stream",
"order-processors",
streamEntry.Id);
}
catch (Exception ex)
{
Console.WriteLine($"Failed to process: {ex.Message}");
// Message will be redelivered
}
});
}
}
Advanced Patterns
Competing Consumers
Distribute work across multiple consumers:
public class WorkerPool
{
private readonly IRedisStreamService _streams;
private readonly List<Task> _workers = new();
public async Task StartWorkersAsync(int workerCount)
{
for (int i = 0; i < workerCount; i++)
{
var workerId = $"worker-{i}";
var worker = StartWorkerAsync(workerId);
_workers.Add(worker);
}
await Task.WhenAll(_workers);
}
private async Task StartWorkerAsync(string workerId)
{
while (true)
{
try
{
var messages = await _streams.ReadGroupAsync<WorkItem>(
streamKey: "tasks:stream",
groupName: "workers",
consumerName: workerId,
count: 10,
blockMilliseconds: 1000);
foreach (var (id, item, entry) in messages)
{
await ProcessWorkItem(item);
await _streams.AcknowledgeAsync(
"tasks:stream",
"workers",
id);
}
}
catch (Exception ex)
{
Console.WriteLine($"Worker {workerId} error: {ex.Message}");
await Task.Delay(5000); // Back off on error
}
}
}
}
Claim Pending Messages
Handle failed consumers and stuck messages:
public class MessageReclaimer
{
private readonly IRedisStreamService _streams;
public async Task ReclaimStuckMessagesAsync()
{
// Get pending messages older than 5 minutes
var pending = await _streams.GetPendingMessagesAsync(
"events:stream",
"processing-group",
TimeSpan.FromMinutes(5));
foreach (var message in pending)
{
if (message.DeliveryCount > 3)
{
// Move to dead letter stream
await _streams.AddAsync("events:dlq", message.Data);
await _streams.AcknowledgeAsync(
"events:stream",
"processing-group",
message.Id);
}
else
{
// Reclaim for processing
await _streams.ClaimAsync(
"events:stream",
"processing-group",
"reclaimer",
message.Id);
}
}
}
}
Streaming API (Memory-Efficient Processing)
Process large streams without loading everything into memory:
public class StreamingProcessor
{
private readonly IRedisStreamService _streams;
public async Task ProcessLargeStreamAsync(CancellationToken ct)
{
// Memory-efficient streaming with IAsyncEnumerable
await foreach (var (id, data) in _streams.ReadStreamingAsync<OrderEvent>(
"orders:stream",
start: "-", // From beginning
batchSize: 100,
ct))
{
if (data != null)
{
Console.WriteLine($"Processing order {id}: {data.OrderId}");
await ProcessOrderAsync(data);
}
}
}
public async Task ProcessWithAutoAckAsync(CancellationToken ct)
{
// Streaming with automatic acknowledgment
await foreach (var (id, data, ackFunc) in _streams.ReadGroupStreamingAsync<OrderEvent>(
"orders:stream",
"processing-group",
"worker-1",
batchSize: 10,
ct))
{
if (data != null)
{
try
{
await ProcessOrderAsync(data);
await ackFunc(); // Acknowledge on success
}
catch (Exception ex)
{
Console.WriteLine($"Failed to process {id}: {ex.Message}");
// Message remains unacknowledged for retry
}
}
}
}
}
Stream Aggregation
Aggregate data from multiple streams:
public class StreamAggregator
{
private readonly IRedisStreamService _streams;
public async Task AggregateStreamsAsync()
{
var streams = new[]
{
"sensors:temperature",
"sensors:humidity",
"sensors:pressure"
};
var tasks = streams.Select(stream =>
_streams.ReadAsync<SensorData>(stream, "-", 100));
var results = await Task.WhenAll(tasks);
var aggregated = new AggregatedSensorData
{
Timestamp = DateTime.UtcNow,
Temperature = ExtractLatest(results[0]),
Humidity = ExtractLatest(results[1]),
Pressure = ExtractLatest(results[2])
};
await _streams.AddAsync("sensors:aggregated", aggregated);
}
}
Windowed Processing
Process streams in time windows:
public class WindowedProcessor
{
private readonly IRedisStreamService _streams;
private readonly IRedisCacheService _cache;
public async Task ProcessWindowAsync(TimeSpan windowSize)
{
var windowEnd = DateTime.UtcNow;
var windowStart = windowEnd - windowSize;
var messages = await _streams.RangeAsync<MetricEvent>(
"metrics:stream",
$"{windowStart.Ticks}-0",
$"{windowEnd.Ticks}-0");
var summary = new WindowSummary
{
WindowStart = windowStart,
WindowEnd = windowEnd,
Count = messages.Count,
Average = messages.Average(m => m.Item2.Value),
Max = messages.Max(m => m.Item2.Value),
Min = messages.Min(m => m.Item2.Value)
};
await _cache.SetAsync(
$"window:{windowStart:yyyyMMddHHmmss}",
summary,
TimeSpan.FromHours(24));
}
}
Stream Trimming
Manage stream size:
public class StreamMaintenance
{
private readonly IRedisStreamService _streams;
public async Task TrimByCountAsync(string streamKey, int maxLength)
{
// Keep only last N messages
await _streams.TrimAsync(streamKey, maxLength);
}
public async Task TrimByAgeAsync(string streamKey, TimeSpan maxAge)
{
var cutoffTime = DateTime.UtcNow - maxAge;
var cutoffId = $"{cutoffTime.Ticks}-0";
// Remove messages older than cutoff
await _streams.TrimByMinIdAsync(streamKey, cutoffId);
}
public async Task ApproximateTrimAsync(string streamKey, int approximateMax)
{
// Use ~ for approximate trimming (more efficient)
await _streams.TrimApproximateAsync(streamKey, approximateMax);
}
}
Monitoring and Health
Stream Health Monitoring
public class StreamHealthMonitor
{
private readonly IRedisStreamService _streams;
public async Task<StreamHealthInfo> GetHealthAsync(string streamKey)
{
var info = await _streams.GetStreamInfoAsync(streamKey);
var groups = await _streams.GetConsumerGroupsAsync(streamKey);
var health = new StreamHealthInfo
{
StreamKey = streamKey,
Length = info.Length,
FirstEntry = info.FirstEntry,
LastEntry = info.LastEntry,
ConsumerGroups = groups.Select(g => new ConsumerGroupHealth
{
Name = g.Name,
Pending = g.PendingCount,
LastDeliveredId = g.LastDeliveredId,
Consumers = g.Consumers
}).ToList()
};
// Check for issues
health.Issues = DetectIssues(health);
return health;
}
private List<string> DetectIssues(StreamHealthInfo health)
{
var issues = new List<string>();
if (health.Length > 1000000)
issues.Add("Stream size exceeds 1M entries");
foreach (var group in health.ConsumerGroups)
{
if (group.Pending > 1000)
issues.Add($"Group {group.Name} has {group.Pending} pending messages");
if (group.Consumers == 0)
issues.Add($"Group {group.Name} has no active consumers");
}
return issues;
}
}
Consumer Monitoring
public class ConsumerMonitor
{
private readonly IRedisStreamService _streams;
private readonly Dictionary<string, ConsumerMetrics> _metrics = new();
public async Task MonitorConsumerAsync(
string streamKey,
string groupName,
string consumerName)
{
var key = $"{streamKey}:{groupName}:{consumerName}";
if (!_metrics.ContainsKey(key))
{
_metrics[key] = new ConsumerMetrics
{
StreamKey = streamKey,
GroupName = groupName,
ConsumerName = consumerName
};
}
await _streams.ConsumeAsync<object>(
streamKey,
groupName,
consumerName,
async (message, entry) =>
{
var metrics = _metrics[key];
var startTime = DateTime.UtcNow;
try
{
// Process message
await ProcessMessage(message);
metrics.ProcessedCount++;
metrics.TotalProcessingTime += (DateTime.UtcNow - startTime).TotalMilliseconds;
}
catch (Exception ex)
{
metrics.ErrorCount++;
metrics.LastError = ex.Message;
}
metrics.LastProcessedTime = DateTime.UtcNow;
metrics.AverageProcessingTime = metrics.TotalProcessingTime / metrics.ProcessedCount;
});
}
}
Best Practices
- Use consumer groups for scalability - Distribute work across multiple consumers
- Implement proper acknowledgment - Acknowledge only after successful processing
- Handle pending messages - Implement reclaim logic for stuck messages
- Monitor stream size - Trim old messages to control memory usage
- Use appropriate IDs - Let Redis generate IDs unless you need custom ordering
- Batch operations - Read multiple messages at once for efficiency
- Implement dead letter queues - Handle permanently failed messages
- Monitor consumer health - Track pending messages and consumer activity
- Use blocking reads wisely - Balance between latency and CPU usage
- Consider retention policies - Implement automatic trimming based on age or size