Table of Contents

Sharded Pub/Sub Guide

Sharded Pub/Sub is a new feature introduced in Redis 7.0 that provides better scalability for pub/sub operations in Redis Cluster environments by distributing messages across shards.

What is Sharded Pub/Sub?

Traditional Redis Pub/Sub broadcasts messages to all nodes in a cluster, which can create significant network overhead. Sharded Pub/Sub solves this by:

  • Routing messages only to the shard that owns the channel
  • Distributing channels across shards using the same hash slot mechanism as keys
  • Reducing inter-node communication in clusters
  • Providing better horizontal scalability

Key Differences from Regular Pub/Sub

Feature Regular Pub/Sub Sharded Pub/Sub
Message Distribution All nodes Single shard
Pattern Subscriptions Supported Not Supported
Cluster Scalability Limited Excellent
Network Usage High (broadcast) Low (targeted)
Commands PUBLISH/SUBSCRIBE SPUBLISH/SSUBSCRIBE

Using Sharded Pub/Sub with RedisKit

Checking Support

Sharded Pub/Sub requires Redis 7.0 or later:

var shardedPubSub = services.GetRequiredService<IRedisShardedPubSub>();

if (await shardedPubSub.IsSupportedAsync())
{
    // Redis 7.0+ with sharded pub/sub support
}
else
{
    // Fall back to regular pub/sub
    var regularPubSub = services.GetRequiredService<IRedisPubSubService>();
}

Basic Publishing

// Publish a message to a sharded channel
var subscribers = await shardedPubSub.PublishAsync(
    "orders:new",
    new OrderMessage 
    { 
        OrderId = 12345,
        CustomerId = "CUST-001",
        Amount = 99.99m,
        Items = new[] { "Item1", "Item2" }
    });

Console.WriteLine($"Message delivered to {subscribers} subscribers");

Basic Subscription

// Subscribe to a sharded channel
var subscriptionToken = await shardedPubSub.SubscribeAsync<OrderMessage>(
    "orders:new",
    async (message, cancellationToken) =>
    {
        Console.WriteLine($"Order received on shard: {message.ShardId}");
        Console.WriteLine($"Order ID: {message.Data.OrderId}");
        Console.WriteLine($"Amount: {message.Data.Amount}");
        
        await ProcessOrder(message.Data);
    });

// Keep the subscription token to unsubscribe later
// The subscription will remain active until explicitly unsubscribed

Unsubscribing

// Method 1: Using the subscription token
await shardedPubSub.UnsubscribeAsync(subscriptionToken);

// Method 2: Unsubscribe from channel directly
await shardedPubSub.UnsubscribeAsync("orders:new");

Important Limitation: No Pattern Support

Sharded Pub/Sub does NOT support pattern subscriptions. This is a Redis limitation, not a library limitation.

// This will throw NotSupportedException
try
{
    await shardedPubSub.SubscribePatternAsync<Message>(
        "orders:*",  // Pattern not supported!
        handler);
}
catch (NotSupportedException ex)
{
    Console.WriteLine("Pattern subscriptions are not supported in sharded pub/sub");
    // Use regular pub/sub for patterns instead
}

Workaround for Pattern-like Behavior

If you need pattern-like behavior with sharded pub/sub, subscribe to multiple specific channels:

// Instead of pattern "user:*:notifications"
// Subscribe to specific channels
var channels = new[] 
{ 
    "user:123:notifications",
    "user:456:notifications",
    "user:789:notifications"
};

var subscriptions = new List<SubscriptionToken>();
foreach (var channel in channels)
{
    var token = await shardedPubSub.SubscribeAsync<Notification>(
        channel,
        async (msg, ct) => await HandleNotification(msg));
    subscriptions.Add(token);
}

Statistics and Monitoring

// Get sharded pub/sub statistics
var stats = await shardedPubSub.GetStatsAsync();
Console.WriteLine($"Total Channels: {stats.TotalChannels}");
Console.WriteLine($"Total Subscribers: {stats.TotalSubscribers}");
Console.WriteLine($"Collected At: {stats.CollectedAt}");

// Get subscriber count for a specific channel
var count = await shardedPubSub.GetSubscriberCountAsync("orders:new");
Console.WriteLine($"Subscribers for 'orders:new': {count}");

Best Practices

1. Channel Naming for Optimal Sharding

Design channel names to distribute evenly across shards:

// Good: Includes variable component for distribution
$"notifications:user:{userId}"
$"events:session:{sessionId}"
$"updates:product:{productId}"

// Bad: Static names that always go to same shard
"notifications:all"
"events:global"
"updates:system"

2. Use Sharded for High-Volume Channels

// High-volume: Use sharded pub/sub
await shardedPubSub.PublishAsync($"telemetry:device:{deviceId}", data);

// Low-volume or needs patterns: Use regular pub/sub
await regularPubSub.PublishAsync("system:alerts", alert);

3. Message Size Considerations

Sharded pub/sub is ideal for frequent, smaller messages:

// Good: Frequent small updates
public class LocationUpdate
{
    public string DeviceId { get; set; }
    public double Latitude { get; set; }
    public double Longitude { get; set; }
    public DateTime Timestamp { get; set; }
}

// Consider alternatives for large payloads
// Maybe store in Redis and send notification
public class LargeDataNotification
{
    public string DataKey { get; set; }  // Key to fetch actual data
    public string DataType { get; set; }
    public long Size { get; set; }
}

4. Error Handling

var subscription = await shardedPubSub.SubscribeAsync<OrderMessage>(
    "orders:new",
    async (message, cancellationToken) =>
    {
        try
        {
            await ProcessOrder(message.Data);
        }
        catch (Exception ex)
        {
            // Log error but don't throw
            // Throwing would affect other subscribers
            logger.LogError(ex, "Error processing order {OrderId}", 
                message.Data.OrderId);
            
            // Consider dead letter queue
            await StoreFailedMessage(message);
        }
    });

5. Graceful Shutdown

public class OrderProcessingService : IHostedService
{
    private readonly List<SubscriptionToken> _subscriptions = new();
    private readonly IRedisShardedPubSub _shardedPubSub;
    
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var token = await _shardedPubSub.SubscribeAsync<OrderMessage>(
            "orders:new",
            ProcessOrder);
        _subscriptions.Add(token);
    }
    
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        // Unsubscribe all on shutdown
        foreach (var token in _subscriptions)
        {
            await _shardedPubSub.UnsubscribeAsync(token, cancellationToken);
        }
    }
}

When to Use Sharded vs Regular Pub/Sub

Use Sharded Pub/Sub When:

  • Running Redis Cluster (or planning to)
  • High message volume per channel
  • Need horizontal scalability
  • Can work without pattern subscriptions
  • Channel names are well-distributed

Use Regular Pub/Sub When:

  • Need pattern subscriptions
  • Running standalone Redis
  • Broadcasting to many nodes is desired
  • Low message volume
  • Need Redis < 7.0 compatibility

Hybrid Approach

You can use both in the same application:

public class MessagingService
{
    private readonly IRedisShardedPubSub _shardedPubSub;
    private readonly IRedisPubSubService _regularPubSub;
    
    public async Task PublishUserEventAsync(string userId, UserEvent evt)
    {
        // Use sharded for user-specific high-volume events
        await _shardedPubSub.PublishAsync($"user:{userId}:events", evt);
    }
    
    public async Task PublishSystemAlertAsync(SystemAlert alert)
    {
        // Use regular for system-wide broadcasts
        await _regularPubSub.PublishAsync("system:alerts:*", alert);
    }
    
    public async Task SubscribeToUserEventsAsync(string userId)
    {
        // Sharded subscription for specific user
        await _shardedPubSub.SubscribeAsync<UserEvent>(
            $"user:{userId}:events",
            HandleUserEvent);
    }
    
    public async Task SubscribeToAllAlertsAsync()
    {
        // Regular pattern subscription for all alerts
        await _regularPubSub.SubscribePatternAsync<SystemAlert>(
            "system:alerts:*",
            HandleSystemAlert);
    }
}

Performance Considerations

Throughput Comparison

In a 3-node Redis Cluster:

Regular Pub/Sub:
- 1 publish = 3 network hops (broadcast to all)
- 10,000 msg/sec = 30,000 network operations

Sharded Pub/Sub:
- 1 publish = 1 network hop (targeted shard)
- 10,000 msg/sec = 10,000 network operations

Latency Benefits

// Benchmark results (example)
public class PubSubBenchmark
{
    // Regular: ~2.5ms average (cluster broadcast)
    // Sharded: ~0.8ms average (single shard)
    
    [Benchmark]
    public async Task RegularPublish()
    {
        await regularPubSub.PublishAsync("test", message);
    }
    
    [Benchmark]
    public async Task ShardedPublish()
    {
        await shardedPubSub.PublishAsync("test", message);
    }
}

Troubleshooting

Issue: NotSupportedException

// Problem: Pattern subscription attempted
await shardedPubSub.SubscribePatternAsync<T>("pattern:*", handler);
// Error: NotSupportedException

// Solution: Use regular pub/sub for patterns
await regularPubSub.SubscribePatternAsync<T>("pattern:*", handler);

Issue: Messages Not Received

// Check if actually subscribed
var stats = await shardedPubSub.GetStatsAsync();
if (stats.TotalChannels == 0)
{
    Console.WriteLine("No active subscriptions");
}

// Verify Redis version supports sharded pub/sub
if (!await shardedPubSub.IsSupportedAsync())
{
    Console.WriteLine("Redis 7.0+ required for sharded pub/sub");
}

Issue: Uneven Shard Distribution

// Problem: All messages going to same shard
// Bad channel naming
for (int i = 0; i < 1000; i++)
{
    await shardedPubSub.PublishAsync("channel", message);
}

// Solution: Include variable in channel name
for (int i = 0; i < 1000; i++)
{
    await shardedPubSub.PublishAsync($"channel:{i}", message);
}

Migration Guide

From Regular to Sharded Pub/Sub

// Before: Regular pub/sub
public class OldMessaging
{
    private readonly IRedisPubSubService _pubSub;
    
    public async Task PublishAsync(string channel, Message msg)
    {
        await _pubSub.PublishAsync(channel, msg);
    }
    
    public async Task SubscribeAsync(string pattern)
    {
        // Pattern subscription
        await _pubSub.SubscribePatternAsync<Message>(
            pattern, 
            HandleMessage);
    }
}

// After: Sharded pub/sub with fallback
public class NewMessaging
{
    private readonly IRedisShardedPubSub _shardedPubSub;
    private readonly IRedisPubSubService _regularPubSub;
    
    public async Task PublishAsync(string channel, Message msg)
    {
        // Use sharded for better performance
        if (await _shardedPubSub.IsSupportedAsync())
        {
            await _shardedPubSub.PublishAsync(channel, msg);
        }
        else
        {
            // Fallback to regular
            await _regularPubSub.PublishAsync(channel, msg);
        }
    }
    
    public async Task SubscribeAsync(string channelOrPattern)
    {
        if (channelOrPattern.Contains("*") || channelOrPattern.Contains("?"))
        {
            // Pattern: must use regular
            await _regularPubSub.SubscribePatternAsync<Message>(
                channelOrPattern, 
                HandleMessage);
        }
        else if (await _shardedPubSub.IsSupportedAsync())
        {
            // Specific channel: use sharded
            await _shardedPubSub.SubscribeAsync<Message>(
                channelOrPattern,
                async (msg, ct) => await HandleMessage(msg.Data, ct));
        }
        else
        {
            // Fallback to regular
            await _regularPubSub.SubscribeAsync<Message>(
                channelOrPattern,
                HandleMessage);
        }
    }
}

Next Steps