Azure EventHubTrigger Function App 中的偏移量
Offset in Azure EventHubTrigger Function App
期望:
当新事件进入时,使用带有 EventHubTrigger-CSharp 模板的函数应用程序运行。
实施:
使用 https://azure.microsoft.com/en-us/documentation/articles/functions-bindings-event-hubs/
中的非常基本的示例
结果:
解释方法:
public static void Run(string myEventHubMessage, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message: {myEventHubMessage}");
}
确实传送了事件队列中的 'first' 而不是当前队列。
补充:
- 我添加了一个 consumerGroup(所以不是从 $Default 读取)
- 根据此博客:http://www.yourcloudbuddy.com/2016/04/azure-functions-and-event-hubs.html 'there is currently no way to configure it to start a particular offset'。
问题:
如何在我的函数应用程序的中心接收最新事件?
更多信息:
我需要将新信号从事件中心转发到外部 mqtt api(目前有效)。可能我需要另一种方法?
EventHub 侦听基于 EventProcessorHost class (https://msdn.microsoft.com/en-us/library/azure/microsoft.servicebus.messaging.eventprocessorhost.aspx) 并具有这些默认语义。
你是对的,你无法控制起始偏移量。它只是开始监听和检查点。
我们还在 wiki 上跟踪其他信息:https://github.com/Azure/azure-webjobs-sdk/wiki/EventHub-support
提交功能请求
@Ramin,我尝试重新创建一个类似于您的场景,但无法重现该问题。以下是步骤:
设置事件中心触发的函数(使用截至 2016 年 10 月 27 日的最新版本 0.7)
设置一个简单的控制台应用程序发送器到我的事件中心,发送 10 条带有时间戳和消息 ID 的消息。每条消息发送间隔 6 分钟。这是记录已发送消息的控制台应用程序的输出,
Sent message: Timestamp: 10/27/2016 2:27:06 PM, MessageId: 1
Sent message: Timestamp: 10/27/2016 2:33:07 PM, MessageId: 2
Sent message: Timestamp: 10/27/2016 2:39:07 PM, MessageId: 3
Sent message: Timestamp: 10/27/2016 2:45:07 PM, MessageId: 4
Sent message: Timestamp: 10/27/2016 2:51:08 PM, MessageId: 5
Sent message: Timestamp: 10/27/2016 2:57:08 PM, MessageId: 6
Sent message: Timestamp: 10/27/2016 3:03:08 PM, MessageId: 7
Sent message: Timestamp: 10/27/2016 3:09:08 PM, MessageId: 8
Sent message: Timestamp: 10/27/2016 3:15:08 PM, MessageId: 9
Sent message: Timestamp: 10/27/2016 3:21:08 PM, MessageId: 10
Stopped sending messages.
这是我的函数的日志条目,
2016-10-27T21:27:26.516 Function started (Id=d5ed0e23-2b0e-4e0b-a858-f1e497dcac8c)
2016-10-27T21:27:26.516 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:27:06 PM, MessageId: 1
2016-10-27T21:27:26.516 Function completed (Success, Id=d5ed0e23-2b0e-4e0b-a858-f1e497dcac8c)
2016-10-27T21:33:26.667 Function started (Id=9525a046-86fb-4499-9d4f-b0d0fd0d0829)
2016-10-27T21:33:26.667 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:33:07 PM, MessageId: 2
2016-10-27T21:33:26.667 Function completed (Success, Id=9525a046-86fb-4499-9d4f-b0d0fd0d0829)
2016-10-27T21:39:42.074 Function started (Id=e2d528c9-f1b9-41aa-9c38-669c57c8182a)
2016-10-27T21:39:42.074 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:39:07 PM, MessageId: 3
2016-10-27T21:39:42.074 Function completed (Success, Id=e2d528c9-f1b9-41aa-9c38-669c57c8182a)
2016-10-27T21:45:26.794 Function started (Id=ff5325af-5dab-48fb-95b1-8318fada3c8c)
2016-10-27T21:45:26.794 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:45:07 PM, MessageId: 4
2016-10-27T21:45:26.794 Function completed (Success, Id=ff5325af-5dab-48fb-95b1-8318fada3c8c)
2016-10-27T21:51:26.875 Function started (Id=01d3fbf9-b772-4076-8fbf-783dc16677a7)
2016-10-27T21:51:26.875 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:51:08 PM, MessageId: 5
2016-10-27T21:51:26.875 Function completed (Success, Id=01d3fbf9-b772-4076-8fbf-783dc16677a7)
2016-10-27T21:57:26.989 Function started (Id=443d96e6-af97-460a-9f70-9dbc2eaf2f37)
2016-10-27T21:57:26.989 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:57:08 PM, MessageId: 6
2016-10-27T21:57:26.989 Function completed (Success, Id=443d96e6-af97-460a-9f70-9dbc2eaf2f37)
2016-10-27T22:03:27.088 Function started (Id=9eefe03d-8e78-4119-a453-96655ea01824)
2016-10-27T22:03:27.088 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:03:08 PM, MessageId: 7
2016-10-27T22:03:27.088 Function completed (Success, Id=9eefe03d-8e78-4119-a453-96655ea01824)
2016-10-27T22:10:01.872 Function started (Id=34c9b277-059d-4839-9dce-aeb03afb2871)
2016-10-27T22:10:01.872 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:09:08 PM, MessageId: 8
2016-10-27T22:10:01.872 Function completed (Success, Id=34c9b277-059d-4839-9dce-aeb03afb2871)
2016-10-27T22:15:27.706 Function started (Id=50eda659-c68f-4e61-a942-32160668fd5c)
2016-10-27T22:15:27.706 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:15:08 PM, MessageId: 9
2016-10-27T22:15:27.706 Function completed (Success, Id=50eda659-c68f-4e61-a942-32160668fd5c)
2016-10-27T22:21:52.162 Function started (Id=fa8f8059-013f-42f9-8047-391d4e3fb4a3)
2016-10-27T22:21:52.162 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:21:08 PM, MessageId: 10
2016-10-27T22:21:52.162 Function completed (Success, Id=fa8f8059-013f-42f9-8047-391d4e3fb4a3)
如您所见,所有消息都没有重复到达。您能否确认每次调用您的函数(您可以检查您的函数日志),它实际上 运行 成功完成?
如果您看到以下消息,
Function completed (Success, Id=<some Guid>)
对于您的函数的每次调用,该执行的消息都应该被检查点,并且您不应该在下一次调用中再次处理消息。
但是,如果您没有看到消息或看到错误消息,则函数 运行 失败,这将导致消息未被检查点。当 Function 再次被触发时,它将从最后一个检查点获取,如果您的 Function 从未成功处理该消息,则该检查点可能是 first 消息。
现在有办法了。
注意:要使其正常工作,您必须在连接到函数应用程序的存储帐户中删除存储在 azure-webjobs-eventhub
中的使用者组数据。
我们现在可以使用 Microsoft.Azure.Functions.Extensions
nuget 包在 Azure Functions 中使用依赖注入。
这为我们开辟了一种方式来控制我们的 Azure Function App 从何处开始处理我们的事件中心,方法如下:
using System;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
[assembly: FunctionsStartup(typeof(EventHubOffsetTest.Startup))]
namespace EventHubOffsetTest
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.PostConfigure<EventHubOptions>(o => o.EventProcessorOptions.InitialOffsetProvider = GetInitialOffset);
}
private static EventPosition GetInitialOffset(string partitionId)
{
return EventPosition.FromEnd();
}
}
}
期望: 当新事件进入时,使用带有 EventHubTrigger-CSharp 模板的函数应用程序运行。
实施: 使用 https://azure.microsoft.com/en-us/documentation/articles/functions-bindings-event-hubs/
中的非常基本的示例结果: 解释方法:
public static void Run(string myEventHubMessage, TraceWriter log)
{
log.Info($"C# Event Hub trigger function processed a message: {myEventHubMessage}");
}
确实传送了事件队列中的 'first' 而不是当前队列。
补充:
- 我添加了一个 consumerGroup(所以不是从 $Default 读取)
- 根据此博客:http://www.yourcloudbuddy.com/2016/04/azure-functions-and-event-hubs.html 'there is currently no way to configure it to start a particular offset'。
问题: 如何在我的函数应用程序的中心接收最新事件?
更多信息: 我需要将新信号从事件中心转发到外部 mqtt api(目前有效)。可能我需要另一种方法?
EventHub 侦听基于 EventProcessorHost class (https://msdn.microsoft.com/en-us/library/azure/microsoft.servicebus.messaging.eventprocessorhost.aspx) 并具有这些默认语义。 你是对的,你无法控制起始偏移量。它只是开始监听和检查点。
我们还在 wiki 上跟踪其他信息:https://github.com/Azure/azure-webjobs-sdk/wiki/EventHub-support
提交功能请求@Ramin,我尝试重新创建一个类似于您的场景,但无法重现该问题。以下是步骤:
设置事件中心触发的函数(使用截至 2016 年 10 月 27 日的最新版本 0.7)
设置一个简单的控制台应用程序发送器到我的事件中心,发送 10 条带有时间戳和消息 ID 的消息。每条消息发送间隔 6 分钟。这是记录已发送消息的控制台应用程序的输出,
Sent message: Timestamp: 10/27/2016 2:27:06 PM, MessageId: 1 Sent message: Timestamp: 10/27/2016 2:33:07 PM, MessageId: 2 Sent message: Timestamp: 10/27/2016 2:39:07 PM, MessageId: 3 Sent message: Timestamp: 10/27/2016 2:45:07 PM, MessageId: 4 Sent message: Timestamp: 10/27/2016 2:51:08 PM, MessageId: 5 Sent message: Timestamp: 10/27/2016 2:57:08 PM, MessageId: 6 Sent message: Timestamp: 10/27/2016 3:03:08 PM, MessageId: 7 Sent message: Timestamp: 10/27/2016 3:09:08 PM, MessageId: 8 Sent message: Timestamp: 10/27/2016 3:15:08 PM, MessageId: 9 Sent message: Timestamp: 10/27/2016 3:21:08 PM, MessageId: 10 Stopped sending messages.
这是我的函数的日志条目,
2016-10-27T21:27:26.516 Function started (Id=d5ed0e23-2b0e-4e0b-a858-f1e497dcac8c)
2016-10-27T21:27:26.516 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:27:06 PM, MessageId: 1
2016-10-27T21:27:26.516 Function completed (Success, Id=d5ed0e23-2b0e-4e0b-a858-f1e497dcac8c)
2016-10-27T21:33:26.667 Function started (Id=9525a046-86fb-4499-9d4f-b0d0fd0d0829)
2016-10-27T21:33:26.667 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:33:07 PM, MessageId: 2
2016-10-27T21:33:26.667 Function completed (Success, Id=9525a046-86fb-4499-9d4f-b0d0fd0d0829)
2016-10-27T21:39:42.074 Function started (Id=e2d528c9-f1b9-41aa-9c38-669c57c8182a)
2016-10-27T21:39:42.074 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:39:07 PM, MessageId: 3
2016-10-27T21:39:42.074 Function completed (Success, Id=e2d528c9-f1b9-41aa-9c38-669c57c8182a)
2016-10-27T21:45:26.794 Function started (Id=ff5325af-5dab-48fb-95b1-8318fada3c8c)
2016-10-27T21:45:26.794 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:45:07 PM, MessageId: 4
2016-10-27T21:45:26.794 Function completed (Success, Id=ff5325af-5dab-48fb-95b1-8318fada3c8c)
2016-10-27T21:51:26.875 Function started (Id=01d3fbf9-b772-4076-8fbf-783dc16677a7)
2016-10-27T21:51:26.875 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:51:08 PM, MessageId: 5
2016-10-27T21:51:26.875 Function completed (Success, Id=01d3fbf9-b772-4076-8fbf-783dc16677a7)
2016-10-27T21:57:26.989 Function started (Id=443d96e6-af97-460a-9f70-9dbc2eaf2f37)
2016-10-27T21:57:26.989 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 2:57:08 PM, MessageId: 6
2016-10-27T21:57:26.989 Function completed (Success, Id=443d96e6-af97-460a-9f70-9dbc2eaf2f37)
2016-10-27T22:03:27.088 Function started (Id=9eefe03d-8e78-4119-a453-96655ea01824)
2016-10-27T22:03:27.088 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:03:08 PM, MessageId: 7
2016-10-27T22:03:27.088 Function completed (Success, Id=9eefe03d-8e78-4119-a453-96655ea01824)
2016-10-27T22:10:01.872 Function started (Id=34c9b277-059d-4839-9dce-aeb03afb2871)
2016-10-27T22:10:01.872 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:09:08 PM, MessageId: 8
2016-10-27T22:10:01.872 Function completed (Success, Id=34c9b277-059d-4839-9dce-aeb03afb2871)
2016-10-27T22:15:27.706 Function started (Id=50eda659-c68f-4e61-a942-32160668fd5c)
2016-10-27T22:15:27.706 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:15:08 PM, MessageId: 9
2016-10-27T22:15:27.706 Function completed (Success, Id=50eda659-c68f-4e61-a942-32160668fd5c)
2016-10-27T22:21:52.162 Function started (Id=fa8f8059-013f-42f9-8047-391d4e3fb4a3)
2016-10-27T22:21:52.162 C# Event Hub trigger function processed a message: Timestamp: 10/27/2016 3:21:08 PM, MessageId: 10
2016-10-27T22:21:52.162 Function completed (Success, Id=fa8f8059-013f-42f9-8047-391d4e3fb4a3)
如您所见,所有消息都没有重复到达。您能否确认每次调用您的函数(您可以检查您的函数日志),它实际上 运行 成功完成?
如果您看到以下消息,
Function completed (Success, Id=<some Guid>)
对于您的函数的每次调用,该执行的消息都应该被检查点,并且您不应该在下一次调用中再次处理消息。
但是,如果您没有看到消息或看到错误消息,则函数 运行 失败,这将导致消息未被检查点。当 Function 再次被触发时,它将从最后一个检查点获取,如果您的 Function 从未成功处理该消息,则该检查点可能是 first 消息。
现在有办法了。
注意:要使其正常工作,您必须在连接到函数应用程序的存储帐户中删除存储在 azure-webjobs-eventhub
中的使用者组数据。
我们现在可以使用 Microsoft.Azure.Functions.Extensions
nuget 包在 Azure Functions 中使用依赖注入。
这为我们开辟了一种方式来控制我们的 Azure Function App 从何处开始处理我们的事件中心,方法如下:
using System;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
[assembly: FunctionsStartup(typeof(EventHubOffsetTest.Startup))]
namespace EventHubOffsetTest
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.PostConfigure<EventHubOptions>(o => o.EventProcessorOptions.InitialOffsetProvider = GetInitialOffset);
}
private static EventPosition GetInitialOffset(string partitionId)
{
return EventPosition.FromEnd();
}
}
}