单元测试 Azure 事件中心触发器(Azure 函数)

Unit test Azure Event Hub Trigger (Azure Function)

我有一个 Azure 函数,它是一个 EventHub 触发器。正在批量处理事件 (EventData[])。

[FunctionName("EventHubTriggerCSharp")]
public async Task Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
    foreach (var message in eventHubMessages)
    {
        log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
        log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
        await service.CallAsync(message);
    }
}

我想对上述方法进行单元测试。但不确定如何将 EventData[] 参数传递给此方法。找了半天没找到方法才决定问这个问题。

是否可以对上述方法进行单元测试?也许至少能够验证正在调用服务方法?

  • 是的,您可以使用下面的示例对 Azure Functions 进行单元测试。

  • 这里是Azure函数单元测试的示例测试class。

    {  
        "id": 1,  
        "CustomerName": "Test User",  
        "Country": "India",  
        "JoiningDate": "28-09-2020",  
        "PrimeUser": false  
    }  

  • 这是 Azure 事件中心单元测试的示例测试 class

 from unittest.mock import MagicMock
    from unittest.mock import patch
    from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
    from azure.eventhub import PartitionContext, EventData
    import unittest
    class TestStringMethods(unittest.TestCase):
        @patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
        @patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
        def test_eventhub_receive_events(self, mock_blob_checkpoint_store, mock_event_hub_consumer_client):
            mock_event_hub_consumer_client.return_value = MagicMock()
            mock_blob_checkpoint_store.return_value = MagicMock()
            def receive(on_event, **kwargs):
                partition_context = MagicMock()
                partition_context.update_checkpoint = MagicMock()
                on_event(partition_context, EventData("test for Mock"))
            mock_event_hub_consumer_client.receive = receive
            with self.assertLogs('myprogram', level='INFO') as assert_execute:
                receive_events(mock_event_hub_consumer_client)
            self.assertIn("Accepting the event", assert_execute.output[0])
    if __name__ == '__main__':
        unittest.main()

  • 以下是有关 Azure 函数单元测试的 Microsoft 文档,以获取更多信息。
  1. Azure Function Unit Test
  2. Azure Eventhub Trigger Unit Test
  3. Add Unit Tets to Azure Functions

您应该能够构造 EventData 的数组并将其与模拟记录器实例一起传递给 Run 方法。

我用过MsTest,和NUnit应该是一样的逻辑。

[TestClass]
public class EventHubTriggerCSharpTests
{
    private readonly Mock<IService> _mockService = new Mock<IService>();
    private readonly Mock<ILogger> _mockLogger = new Mock<ILogger>();

    private EventHubTriggerCSharp _eventHubTriggerCSharp;

    [TestInitialize]
    public void Initialize()
    {
        _eventHubTriggerCSharp = new EventHubTriggerCSharp(_mockService.Object);
    }

    [TestMethod]
    public async Task WhenEventHubTriggersFunction_ThenCallAsyncWithEventHubMessage()
    {
        // arrange
        var data = "some data";

        var eventHubMessages = new List<EventData>
        {
            new EventData(Encoding.UTF8.GetBytes(data))
            {
                SystemProperties = new EventData.SystemPropertiesCollection(1, DateTime.Now, "offset", "partitionKey")
            }
        };

        // act
        await _eventHubTriggerCSharp.Run(eventHubMessages.ToArray(), _mockLogger.Object);

        // assert
        _mockService.Verify(x => x.CallAsync(eventHubMessages[0]));
    }
}

注意:由于我 Visual Studio 的问题,我实际上 运行 没有进行测试,但希望你明白了。