StackExchange.Redis.ConnectionMultiplexer Dispose() 会导致类似于 UnsubscribeAll() 的行为吗?
Does StackExchange.Redis.ConnectionMultiplexer Dispose() cause behavior similar to UnsubscribeAll()?
我只是不确定 StackExchange.Redis
的源代码。 ConnectionMultiplexer
实例的处理是否会导致取消订阅该实例打开的所有订阅,并且之前没有手动取消订阅?
答案似乎是——不,但仍然不确定。为什么?因为处理 ConnectionMultiplexer
不会清除现有订阅,例如 UnsubscribeAll
正在这样做。但至少,处理 ConnectionMultiplexer
可以防止触发处理程序的未取消订阅的处理程序仍然存在于已处理的 ConnectionMultiplexer
.
例如这样的测试是绿色的。
[Fact]
[Trait("Category", TestCategoryCatalogs.IntegrationTest)]
[Trait("Category", TestCategoryCatalogs.Involve.REDIS)]
public async Task Dispose_of_redis_connection_cause_unsubscribe_of_existing_subscriptions()
{
await _ExecuteAsync(async sp =>
{
IRedisClient redisClient1 = sp.GetService<IRedisClient>();
IRedisClient redisClient2 = sp.GetService<IRedisClient>();
// Immulate two applications with different connections to the same redis instance.
IConnectionMultiplexer connection1 = redisClient1.GetConnection();
IConnectionMultiplexer connection2 = redisClient2.GetConnection();
ISubscriber subscriber1 = connection1.GetSubscriber();
ISubscriber subscriber2 = connection2.GetSubscriber();
var tcs1 = new TaskCompletionSource<bool>();
var tcs2 = new TaskCompletionSource<bool>();
var tcs3 = new TaskCompletionSource<bool>();
var tcs4 = new TaskCompletionSource<bool>();
var tcs5 = new TaskCompletionSource<bool>();
string channel1 = nameof(channel1);
await subscriber1.SubscribeAsync(channel1,
(c, v) =>
{
if (tcs1.Task.IsCompleted)
tcs3.SetResult(true);
else tcs1.SetResult(true);
});
await subscriber2.SubscribeAsync(channel1,
(c, v) =>
{
if (tcs2.Task.IsCompleted)
if (tcs4.Task.IsCompleted)
tcs5.SetResult(true);
else tcs4.SetResult(true);
else tcs2.SetResult(true);
});
await subscriber2.PublishAsync(channel1, "1");
await Task.WhenAll(tcs1.Task, tcs2.Task);
connection1.Dispose();
await subscriber2.PublishAsync(channel1, "2");
await tcs4.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
try
{
await tcs3.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
throw new Exception("Test has been failed");
}
catch (OperationCanceledException)
{
}
FieldInfo subscriptionsFI = connection1.GetType().GetField("subscriptions", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
ICollection subscriptions1 = (ICollection) subscriptionsFI.GetValue(connection1);
ICollection subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
// Subscription is still there but it seems to unactive.
subscriptions1.Count.Should().Be(1);
subscriptions2.Count.Should().Be(1);
// If we clean them manually on alived connection, we will not have any subscription
subscriber1.UnsubscribeAll();
subscriber2.UnsubscribeAll();
subscriptions1 = (ICollection)subscriptionsFI.GetValue(connection1);
subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
subscriptions1.Count.Should().Be(0);
subscriptions2.Count.Should().Be(0);
try
{
await tcs5.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
throw new Exception("Test has been failed");
}
catch (OperationCanceledException)
{
}
}, _GetDefaultServiceProvider, embededTimeoutSeconds: 10);
}
我只是不确定 StackExchange.Redis
的源代码。 ConnectionMultiplexer
实例的处理是否会导致取消订阅该实例打开的所有订阅,并且之前没有手动取消订阅?
答案似乎是——不,但仍然不确定。为什么?因为处理 ConnectionMultiplexer
不会清除现有订阅,例如 UnsubscribeAll
正在这样做。但至少,处理 ConnectionMultiplexer
可以防止触发处理程序的未取消订阅的处理程序仍然存在于已处理的 ConnectionMultiplexer
.
例如这样的测试是绿色的。
[Fact]
[Trait("Category", TestCategoryCatalogs.IntegrationTest)]
[Trait("Category", TestCategoryCatalogs.Involve.REDIS)]
public async Task Dispose_of_redis_connection_cause_unsubscribe_of_existing_subscriptions()
{
await _ExecuteAsync(async sp =>
{
IRedisClient redisClient1 = sp.GetService<IRedisClient>();
IRedisClient redisClient2 = sp.GetService<IRedisClient>();
// Immulate two applications with different connections to the same redis instance.
IConnectionMultiplexer connection1 = redisClient1.GetConnection();
IConnectionMultiplexer connection2 = redisClient2.GetConnection();
ISubscriber subscriber1 = connection1.GetSubscriber();
ISubscriber subscriber2 = connection2.GetSubscriber();
var tcs1 = new TaskCompletionSource<bool>();
var tcs2 = new TaskCompletionSource<bool>();
var tcs3 = new TaskCompletionSource<bool>();
var tcs4 = new TaskCompletionSource<bool>();
var tcs5 = new TaskCompletionSource<bool>();
string channel1 = nameof(channel1);
await subscriber1.SubscribeAsync(channel1,
(c, v) =>
{
if (tcs1.Task.IsCompleted)
tcs3.SetResult(true);
else tcs1.SetResult(true);
});
await subscriber2.SubscribeAsync(channel1,
(c, v) =>
{
if (tcs2.Task.IsCompleted)
if (tcs4.Task.IsCompleted)
tcs5.SetResult(true);
else tcs4.SetResult(true);
else tcs2.SetResult(true);
});
await subscriber2.PublishAsync(channel1, "1");
await Task.WhenAll(tcs1.Task, tcs2.Task);
connection1.Dispose();
await subscriber2.PublishAsync(channel1, "2");
await tcs4.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
try
{
await tcs3.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
throw new Exception("Test has been failed");
}
catch (OperationCanceledException)
{
}
FieldInfo subscriptionsFI = connection1.GetType().GetField("subscriptions", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
ICollection subscriptions1 = (ICollection) subscriptionsFI.GetValue(connection1);
ICollection subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
// Subscription is still there but it seems to unactive.
subscriptions1.Count.Should().Be(1);
subscriptions2.Count.Should().Be(1);
// If we clean them manually on alived connection, we will not have any subscription
subscriber1.UnsubscribeAll();
subscriber2.UnsubscribeAll();
subscriptions1 = (ICollection)subscriptionsFI.GetValue(connection1);
subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
subscriptions1.Count.Should().Be(0);
subscriptions2.Count.Should().Be(0);
try
{
await tcs5.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));
throw new Exception("Test has been failed");
}
catch (OperationCanceledException)
{
}
}, _GetDefaultServiceProvider, embededTimeoutSeconds: 10);
}