带有 Azure Function App 的 IoT EventHub - 同步多线程

IoT EventHub with Azure Function App - synchronous multi-threaded

我对由 EventHub(来自 IotHub)触发的 Azure FunctionApp 有疑问。当我启动功能并且事件中心中有很多历史消息时,FunctionApp 被异步触发,因此同一时间的功能实例很少。问题是我正在使用 Redis 的内部函数,所以流程是: EventHub Message -> Trigger FunctionApp -> 从 Redis 读取数据 -> Main function -> 将数据保存到 Redis -> 设置输出消息 -> End function。每次触发函数时,它都应该在它的另一个实例之前结束。所以它应该是同步的。我试过设置:

WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1

将 App Scale out 设置为 1(原为 200), 但它不起作用。

这对我们来说至关重要。即使我们停止功能一小段时间然后再次启动它也会以损坏的数据结束。

知道如何解决这个问题吗?

编辑:所以似乎一次只有一个函数实例 - 我用 Executing Instances and Allocated Instances timechart 来检查它,我一直有 1 个。

但是我的日志是这样的:

2021-10-27T13:51:50.222 [Information] Json sent sent: MN5 2 10/27/2021 3:50:14 PM
2021-10-27T13:51:50.222 [Information] {OutputJson}
2021-10-27T13:51:50.223 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.229 [Information] Executed 'MessageEventFunction' (Succeeded, Id=ef405154-d600-4a54-9e1b-cc9cc7638873, Duration=96ms)
2021-10-27T13:51:50.230 [Information] Executed 'MessageEventFunction' (Succeeded, Id=ff2d5226-91bf-4a6e-a3f8-9660cac1bdf6, Duration=99ms)
2021-10-27T13:51:50.237 [Information] Executed 'MessageEventFunction' (Succeeded, Id=32d30205-b689-40b8-98d0-185318af8000, Duration=103ms)
2021-10-27T13:51:50.238 [Information] Executed 'MessageEventFunction' (Succeeded, Id=238d7703-a352-487a-9a7e-1dff252d6dc9, Duration=103ms)
2021-10-27T13:51:50.268 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=06e4101d-9cc6-42cc-b3e9-fe4873dabff9)
2021-10-27T13:51:50.268 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.271 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=5f9b3b11-d756-47d4-9cb2-8ad9993cdb8a)
2021-10-27T13:51:50.271 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.271 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=8ab7d257-0280-4bbd-afb7-1c8197a43d86)
2021-10-27T13:51:50.272 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.272 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=cb0b6989-ba36-4be2-9ab4-e848aad341c8)
2021-10-27T13:51:50.272 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.273 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=00821eeb-762f-4f5e-becd-f69a4514734c)
2021-10-27T13:51:50.273 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.274 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=7b158eff-2c59-4983-93c0-64fb33a48885)
2021-10-27T13:51:50.274 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.275 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=1c8688de-e954-466c-95cb-4fcc5a9234ea)
2021-10-27T13:51:50.275 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.275 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=6c5dbac1-e063-43c6-98c5-c898dc971171)
2021-10-27T13:51:50.276 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.276 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=d265f8e5-c7a6-480f-b71b-9450deec7fac)
2021-10-27T13:51:50.277 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.277 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=9b623293-d5bb-4992-b782-dae9f3e3d5bb)
2021-10-27T13:51:50.278 [Information] Trigger Details: PartionId: 3, Offset: 47273311592, EnqueueTimeUtc: 2021-10-27T13:51:21.8510000Z, SequenceNumber: 220220, Count: 10
2021-10-27T13:51:50.278 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.278 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.278 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.279 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.279 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.280 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.280 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.280 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.280 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.281 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:50.314 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.322 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.332 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.341 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.348 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.357 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.381 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.390 [Information] Executed 'MessageEventFunction' (Succeeded, Id=1c8688de-e954-466c-95cb-4fcc5a9234ea, Duration=115ms)
2021-10-27T13:51:50.391 [Information] Executed 'MessageEventFunction' (Succeeded, Id=d265f8e5-c7a6-480f-b71b-9450deec7fac, Duration=115ms)
2021-10-27T13:51:50.392 [Information] Executed 'MessageEventFunction' (Succeeded, Id=9b623293-d5bb-4992-b782-dae9f3e3d5bb, Duration=114ms)
2021-10-27T13:51:50.392 [Information] Executed 'MessageEventFunction' (Succeeded, Id=5f9b3b11-d756-47d4-9cb2-8ad9993cdb8a, Duration=122ms)
2021-10-27T13:51:50.393 [Information] Executed 'MessageEventFunction' (Succeeded, Id=cb0b6989-ba36-4be2-9ab4-e848aad341c8, Duration=120ms)
2021-10-27T13:51:50.393 [Information] Json sent: Rich 2 10/27/2021 3:50:24 PM
2021-10-27T13:51:50.394 [Information] {OutputJson}
2021-10-27T13:51:50.394 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.399 [Information] Json sent sent: P4 2 10/27/2021 3:50:34 PM
2021-10-27T13:51:50.399 [Information] {OutputJson}
2021-10-27T13:51:50.400 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.401 [Information] Executed 'MessageEventFunction' (Succeeded, Id=00821eeb-762f-4f5e-becd-f69a4514734c, Duration=127ms)
2021-10-27T13:51:50.401 [Information] Executed 'MessageEventFunction' (Succeeded, Id=6c5dbac1-e063-43c6-98c5-c898dc971171, Duration=126ms)
2021-10-27T13:51:50.402 [Information] Json sent sent: MN7 2 10/27/2021 3:50:54 PM
2021-10-27T13:51:50.402 [Information] {OutputJson}
2021-10-27T13:51:50.402 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:50.412 [Information] Executed 'MessageEventFunction' (Succeeded, Id=06e4101d-9cc6-42cc-b3e9-fe4873dabff9, Duration=143ms)
2021-10-27T13:51:50.414 [Information] Executed 'MessageEventFunction' (Succeeded, Id=8ab7d257-0280-4bbd-afb7-1c8197a43d86, Duration=142ms)
2021-10-27T13:51:50.425 [Information] Executed 'MessageEventFunction' (Succeeded, Id=7b158eff-2c59-4983-93c0-64fb33a48885, Duration=151ms)
2021-10-27T13:51:51.895 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=726a0425-6959-4f3f-97a4-f722ee78d380)
2021-10-27T13:51:51.896 [Information] Trigger Details: PartionId: 3, Offset: 47273322136, EnqueueTimeUtc: 2021-10-27T13:51:51.8620000Z, SequenceNumber: 220230, Count: 1
2021-10-27T13:51:51.896 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:51.903 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:51.919 [Information] Executed 'MessageEventFunction' (Succeeded, Id=726a0425-6959-4f3f-97a4-f722ee78d380, Duration=24ms)
2021-10-27T13:51:54.968 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=25c647aa-44a3-43f1-b8bf-02ddaae03057)
2021-10-27T13:51:54.968 [Information] Trigger Details: PartionId: 3, Offset: 47273322576, EnqueueTimeUtc: 2021-10-27T13:51:54.9410000Z, SequenceNumber: 220231, Count: 1
2021-10-27T13:51:54.969 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:54.997 [Information] JsonSent sent: P4 2 10/27/2021 3:51:39 PM
2021-10-27T13:51:54.997 [Information] {OutputJson}
2021-10-27T13:51:54.997 [Information] Activity: CacheIn: 21, CacheOut: 21

所以有时函数工作正常,当 SequenceNumber 是唯一的并且 Count = 1 时。其他时间则不是。我的问题是除了 FunctionApp 实例之外的其他东西?

Activity: CacheIn: 21, CacheOut: 21 表示已读取 21 个对象的列表,然后将其写入 Redis 缓存

所以每次都应该是这样(发送 OutputJson 是可选的,取决于 Cache 和 InputJson):

2021-10-27T13:51:54.968 [Information] Executing 'MessageEventFunction' (Reason='(null)', Id=25c647aa-44a3-43f1-b8bf-02ddaae03057)
2021-10-27T13:51:54.968 [Information] Trigger Details: PartionId: 3, Offset: 47273322576, EnqueueTimeUtc: 2021-10-27T13:51:54.9410000Z, SequenceNumber: 220231, Count: 1
2021-10-27T13:51:54.969 [Information] IoT Hub trigger function processed a message: {InputJson}
2021-10-27T13:51:54.997 [Information] JsonSent sent: P4 2 10/27/2021 3:51:39 PM
2021-10-27T13:51:54.997 [Information] {OutputJson}
2021-10-27T13:51:54.997 [Information] Activity: CacheIn: 21, CacheOut: 21
2021-10-27T13:51:55.017 [Information] Executed 'MessageEventFunction' (Succeeded, Id=25c647aa-44a3-43f1-b8bf-02ddaae03057, Duration=49ms)

问题是功能停止了一段时间然后我们将其打开,或者几乎同时收到的消息很少。

要实现时间触发器的背靠背执行,请在应用程序设置配置中将WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT和FUNCTIONS_WORKER_PROCESS_COUNT设置为1。这将确保一次只有 1 个函数执行运行。看到这个 LINK.

好的,所以在 host.json 中你必须添加这个来完成我想要的。

"extensions": {
    "eventHubs": {
      "eventProcessorOptions": {
        "maxBatchSize": 1,
        "prefetchCount": 10
      }
    }
}

服务总线的类似情况:

"serviceBus": {
      "messageHandlerOptions": {
        "maxConcurrentCalls": 1
      }
    }