将 IReliableQueue 更改为 IReliableConcurrentQueue 对现有部署有什么影响?

What impact does changing a IReliableQueue to a IReliableConcurrentQueue have in an existing deployment?

我正在使用 IReliableQueue 的 Service Fabric 应用程序中工作。对于该系统的用例,使用 IReliableConcurrentQueue 是有意义的,并且一些本地测试(即基本上只需更改代码以使用 IReliableConcurrentQueue 而不是 IReliableQueue - 队列名称不会更改)显示出巨大的性能改进。但是,我担心在生产系统中更改此设置(即升级)的影响。我找不到关于这些注意事项的任何文档或在线问题(除非我错过了它们)。例如,在这个系统中,现有的 IReliableQueue 几乎总是有项目。那么,当我升级 SF 应用程序时,这些数据会发生什么变化?它是否可以在 IReliableConcurrentQueue 中出列?或者数据会丢失吗?我知道我可以 "just try it" 但想看看是否有人做过同样的事情或者可以提供指向现有资源的指针。谢谢!

抱歉回答晚了(您可能不再需要,但仍然需要)

当我们在 IReliableStateManager 上调用 GetOrAddAsync 方法时,我们并没有检索接口来存储值——我们实际上创建了一个可靠集合的实例。这基本上意味着我们指定的接口类型非常重要。

考虑到这一点,如果我们这样做:

服务 1.0 版

// Somewhere in RunAsync for example
await this.StateManager.GetOrAddAsync<IReliableQueue<long>>("MyCollection")

然后在下一个版本中这样做:

服务 1.1 版

// Somewhere in RunAsync for example
await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>("MyCollection")

会抛出异常:

Returned reliable object of type Microsoft.ServiceFabric.Data.Collections.DistributedQueue`1[System.Int64] cannot be casted to requested type Microsoft.ServiceFabric.Data.Collections.IReliableConcurrentQueue`1[System.Int64]

然后:

System.ExecutionEngineException: 'Exception of type 'System.ExecutionEngineException' was thrown.'

上面的异常看起来像一个错误所以我已经填写了 one.


更新 2019.06.28

事实证明 System.ExecutionEngineException 的出现不是错误,而是 Environment.FailFast 方法结合 Visual Studio 调试器的未记录行为。

以上问题请看我comment


这就是会发生的事情。

有很多方法可以克服这个问题。

这是最明显的一个:

例子

var migrate = false; // This flag indicates whether the migration was already done.
var migrateValues = new List<long>();

var applicationFlags = await this.StateManager
    .GetOrAddAsync<IReliableDictionary<string, bool>>("application-flags");
using (var transaction = this.StateManager.CreateTransaction())
{
    var flag = await applicationFlags
        .TryGetValueAsync(transaction, "queue-to-concurrent-queue-migration");
    if (!flag.HasValue || !flag.Value)
    {
        var queue = await this.StateManager
            .GetOrAddAsync<IReliableQueue<long>>("value-collection");
        for (;;)
        {
            var c = await queue.TryDequeueAsync(transaction);
            if (!c.HasValue)
            {
                break;
            }

            migrateValues.Add(c.Value);
        }
        migrate = true;
    }
}

if (migrate)
{
    await this.StateManager.RemoveAsync("value-collection");

    using (var transaction = this.StateManager.CreateTransaction())
    {
        var concurrentQueue = await this.StateManager
            .GetOrAddAsync<IReliableConcurrentQueue<long>>("value-collection");

        foreach (var i in migrateValues)
        {
            await concurrentQueue.EnqueueAsync(transaction, i);
        }

        await applicationFlags.AddOrUpdateAsync(
            transaction,
            "queue-to-concurrent-queue-migration",
            true,
            (s, b) => true);
    }
    await transaction.CommitAsync();
}

请注意,此代码只是一个说明性示例,应在将其应用到实际应用程序之前进行适当测试。