单元测试 Azure 事件中心触发器(Azure 函数)
Unit test Azure Event Hub Trigger (Azure Function)
我有一个 Azure 函数,它是一个 EventHub 触发器。正在批量处理事件 (EventData[])。
[FunctionName("EventHubTriggerCSharp")]
public async Task Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
foreach (var message in eventHubMessages)
{
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
await service.CallAsync(message);
}
}
我想对上述方法进行单元测试。但不确定如何将 EventData[] 参数传递给此方法。找了半天没找到方法才决定问这个问题。
是否可以对上述方法进行单元测试?也许至少能够验证正在调用服务方法?
是的,您可以使用下面的示例对 Azure Functions 进行单元测试。
这里是Azure函数单元测试的示例测试class。
{
"id": 1,
"CustomerName": "Test User",
"Country": "India",
"JoiningDate": "28-09-2020",
"PrimeUser": false
}
- 这是 Azure 事件中心单元测试的示例测试 class
from unittest.mock import MagicMock
from unittest.mock import patch
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.eventhub import PartitionContext, EventData
import unittest
class TestStringMethods(unittest.TestCase):
@patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
@patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
def test_eventhub_receive_events(self, mock_blob_checkpoint_store, mock_event_hub_consumer_client):
mock_event_hub_consumer_client.return_value = MagicMock()
mock_blob_checkpoint_store.return_value = MagicMock()
def receive(on_event, **kwargs):
partition_context = MagicMock()
partition_context.update_checkpoint = MagicMock()
on_event(partition_context, EventData("test for Mock"))
mock_event_hub_consumer_client.receive = receive
with self.assertLogs('myprogram', level='INFO') as assert_execute:
receive_events(mock_event_hub_consumer_client)
self.assertIn("Accepting the event", assert_execute.output[0])
if __name__ == '__main__':
unittest.main()
- 以下是有关 Azure 函数单元测试的 Microsoft 文档,以获取更多信息。
您应该能够构造 EventData
的数组并将其与模拟记录器实例一起传递给 Run
方法。
我用过MsTest,和NUnit应该是一样的逻辑。
[TestClass]
public class EventHubTriggerCSharpTests
{
private readonly Mock<IService> _mockService = new Mock<IService>();
private readonly Mock<ILogger> _mockLogger = new Mock<ILogger>();
private EventHubTriggerCSharp _eventHubTriggerCSharp;
[TestInitialize]
public void Initialize()
{
_eventHubTriggerCSharp = new EventHubTriggerCSharp(_mockService.Object);
}
[TestMethod]
public async Task WhenEventHubTriggersFunction_ThenCallAsyncWithEventHubMessage()
{
// arrange
var data = "some data";
var eventHubMessages = new List<EventData>
{
new EventData(Encoding.UTF8.GetBytes(data))
{
SystemProperties = new EventData.SystemPropertiesCollection(1, DateTime.Now, "offset", "partitionKey")
}
};
// act
await _eventHubTriggerCSharp.Run(eventHubMessages.ToArray(), _mockLogger.Object);
// assert
_mockService.Verify(x => x.CallAsync(eventHubMessages[0]));
}
}
注意:由于我 Visual Studio 的问题,我实际上 运行 没有进行测试,但希望你明白了。
我有一个 Azure 函数,它是一个 EventHub 触发器。正在批量处理事件 (EventData[])。
[FunctionName("EventHubTriggerCSharp")]
public async Task Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
foreach (var message in eventHubMessages)
{
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
await service.CallAsync(message);
}
}
我想对上述方法进行单元测试。但不确定如何将 EventData[] 参数传递给此方法。找了半天没找到方法才决定问这个问题。
是否可以对上述方法进行单元测试?也许至少能够验证正在调用服务方法?
是的,您可以使用下面的示例对 Azure Functions 进行单元测试。
这里是Azure函数单元测试的示例测试class。
{
"id": 1,
"CustomerName": "Test User",
"Country": "India",
"JoiningDate": "28-09-2020",
"PrimeUser": false
}
- 这是 Azure 事件中心单元测试的示例测试 class
from unittest.mock import MagicMock
from unittest.mock import patch
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.eventhub import PartitionContext, EventData
import unittest
class TestStringMethods(unittest.TestCase):
@patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
@patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
def test_eventhub_receive_events(self, mock_blob_checkpoint_store, mock_event_hub_consumer_client):
mock_event_hub_consumer_client.return_value = MagicMock()
mock_blob_checkpoint_store.return_value = MagicMock()
def receive(on_event, **kwargs):
partition_context = MagicMock()
partition_context.update_checkpoint = MagicMock()
on_event(partition_context, EventData("test for Mock"))
mock_event_hub_consumer_client.receive = receive
with self.assertLogs('myprogram', level='INFO') as assert_execute:
receive_events(mock_event_hub_consumer_client)
self.assertIn("Accepting the event", assert_execute.output[0])
if __name__ == '__main__':
unittest.main()
- 以下是有关 Azure 函数单元测试的 Microsoft 文档,以获取更多信息。
您应该能够构造 EventData
的数组并将其与模拟记录器实例一起传递给 Run
方法。
我用过MsTest,和NUnit应该是一样的逻辑。
[TestClass]
public class EventHubTriggerCSharpTests
{
private readonly Mock<IService> _mockService = new Mock<IService>();
private readonly Mock<ILogger> _mockLogger = new Mock<ILogger>();
private EventHubTriggerCSharp _eventHubTriggerCSharp;
[TestInitialize]
public void Initialize()
{
_eventHubTriggerCSharp = new EventHubTriggerCSharp(_mockService.Object);
}
[TestMethod]
public async Task WhenEventHubTriggersFunction_ThenCallAsyncWithEventHubMessage()
{
// arrange
var data = "some data";
var eventHubMessages = new List<EventData>
{
new EventData(Encoding.UTF8.GetBytes(data))
{
SystemProperties = new EventData.SystemPropertiesCollection(1, DateTime.Now, "offset", "partitionKey")
}
};
// act
await _eventHubTriggerCSharp.Run(eventHubMessages.ToArray(), _mockLogger.Object);
// assert
_mockService.Verify(x => x.CallAsync(eventHubMessages[0]));
}
}
注意:由于我 Visual Studio 的问题,我实际上 运行 没有进行测试,但希望你明白了。