stackexchange.redis.connectionmultiplexer dispose()是否会导致类似于unsubscribeall()的行为?

8ljdwjyq  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(302)

我只是不确定从源代码 StackExchange.Redis . 是否处理 ConnectionMultiplexer 示例是否导致取消订阅该示例打开的、以前未手动取消订阅的所有订阅?

3yhwsihp

3yhwsihp1#

答案似乎是-不,但仍然不确定。为什么?因为处理掉 ConnectionMultiplexer 不清理现有订阅,例如, UnsubscribeAll 是这样做的。但至少,处理 ConnectionMultiplexer 防止触发仍存在于已释放订阅中的未取消订阅订阅的处理程序 ConnectionMultiplexer .
例如,这样的测试是绿色的。

[Fact]
    [Trait("Category", TestCategoryCatalogs.IntegrationTest)]
    [Trait("Category", TestCategoryCatalogs.Involve.REDIS)]
    public async Task Dispose_of_redis_connection_cause_unsubscribe_of_existing_subscriptions()
    {
        await _ExecuteAsync(async sp =>
        {
            IRedisClient redisClient1 = sp.GetService<IRedisClient>();
            IRedisClient redisClient2 = sp.GetService<IRedisClient>();

            // Immulate two applications with different connections to the same redis instance.
            IConnectionMultiplexer connection1 = redisClient1.GetConnection();
            IConnectionMultiplexer connection2 = redisClient2.GetConnection();

            ISubscriber subscriber1 = connection1.GetSubscriber();
            ISubscriber subscriber2 = connection2.GetSubscriber();

            var tcs1 = new TaskCompletionSource<bool>();
            var tcs2 = new TaskCompletionSource<bool>();
            var tcs3 = new TaskCompletionSource<bool>();
            var tcs4 = new TaskCompletionSource<bool>();
            var tcs5 = new TaskCompletionSource<bool>();

            string channel1 = nameof(channel1);

            await subscriber1.SubscribeAsync(channel1,
                (c, v) =>
                {
                    if (tcs1.Task.IsCompleted)
                        tcs3.SetResult(true);
                    else tcs1.SetResult(true);
                });

            await subscriber2.SubscribeAsync(channel1,
                (c, v) =>
                {
                    if (tcs2.Task.IsCompleted)
                        if (tcs4.Task.IsCompleted)
                            tcs5.SetResult(true);
                        else tcs4.SetResult(true);
                    else tcs2.SetResult(true);
                });

            await subscriber2.PublishAsync(channel1, "1");

            await Task.WhenAll(tcs1.Task, tcs2.Task);

            connection1.Dispose();

            await subscriber2.PublishAsync(channel1, "2");

            await tcs4.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

            try
            {
                await tcs3.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

                throw new Exception("Test has been failed");
            }
            catch (OperationCanceledException)
            {
            }

            FieldInfo subscriptionsFI = connection1.GetType().GetField("subscriptions", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
            ICollection subscriptions1 = (ICollection) subscriptionsFI.GetValue(connection1);
            ICollection subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);

            // Subscription is still there but it seems to unactive.
            subscriptions1.Count.Should().Be(1);
            subscriptions2.Count.Should().Be(1);

            // If we clean them manually on alived connection, we will not have any subscription
            subscriber1.UnsubscribeAll();
            subscriber2.UnsubscribeAll();

            subscriptions1 = (ICollection)subscriptionsFI.GetValue(connection1);
            subscriptions2 = (ICollection)subscriptionsFI.GetValue(connection2);
            subscriptions1.Count.Should().Be(0);
            subscriptions2.Count.Should().Be(0);

            try
            {
                await tcs5.Task.WithTimeout(TimeSpan.FromMilliseconds(1000));

                throw new Exception("Test has been failed");
            }
            catch (OperationCanceledException)
            {
            }
        }, _GetDefaultServiceProvider, embededTimeoutSeconds: 10);
    }

相关问题