Python: 如何为单元测试模拟 kafka 主题?
Python: how to mock a kafka topic for unit tests?
我们有一个消息调度程序,它会根据消息属性生成散列键,然后将其放入带有键的 Kafka 主题队列中。
这样做是为了消除重复数据。但是,我不确定如何在不实际设置本地集群并检查其是否按预期执行的情况下测试此重复数据删除。
在线搜索模拟 Kafka 主题队列的工具没有帮助,我担心我可能以错误的方式思考这个问题。
最终,无论用于模拟 Kafka 队列的是什么,都应该以与本地集群相同的方式运行——即通过向主题队列插入 Key 来提供重复数据删除。
有没有这样的工具?
如果您需要验证 Kafka 特定功能或使用 Kafka 特定功能的实现,那么唯一的方法就是使用 Kafka!
Kafka 是否对其重复数据删除逻辑进行了任何测试?如果是这样,以下组合可能足以减轻您的组织的失败风险:
- 散列逻辑的单元测试(确保同一个对象确实生成相同的散列)
- Kafka 主题重复数据删除测试(Kafka 项目内部)
- 飞行前冒烟测试验证您的应用程序与 Kafka 的集成
如果 Kafka 没有围绕其主题重复数据删除进行任何类型的测试,或者您担心破坏性更改,那么对 Kafka 特定功能进行自动检查很重要。这可以通过集成测试来完成。我最近在基于 Docker 的集成测试管道方面取得了很大的成功。在创建 Kafka docker 图像(一个可能已经从社区中获得)的初步工作之后,设置集成测试管道变得微不足道。管道可能如下所示:
- 执行基于应用程序的单元测试(哈希逻辑)
- 一旦通过,您的 CI 服务器将启动 Kafka
- 执行集成测试,验证重复写入是否只向主题发送一条消息。
我认为重要的是确保 Kafka 集成测试最小化,只包括绝对依赖 Kafka 特定功能的测试。即使使用 docker-compose,它们也可能比单元测试慢几个数量级,~1 毫秒对 1 秒?另一件需要考虑的事情是维护集成管道的开销可能值得相信 Kakfa 将提供它声称的主题重复数据删除的风险。
为了在 Python 单元测试下模拟 Kafka with SBT 测试任务我做了如下。 应该安装 Pyspark。
在 build.sbt 中定义应该 运行 的测试任务:
val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")
val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")
testPythonTask := {
val s: TaskStreams = streams.value
s.log.info("Executing task testPython")
Process(command,
workingDirectory,
// arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
"PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
// collect all jar paths from project
.format((fullClasspath in Runtime value)
.map(_.data.getCanonicalPath)
.filter(_.contains(".jar"))
.mkString(",")),
"PYSPARK_PYTHON" -> "python3") ! s.log
}
//attach custom test task to default test tasks
test in Test := {
testPythonTask.value
(test in Test).value
}
testOnly in Test := {
testPythonTask.value
(testOnly in Test).value
}
在python测试用例 (app_test.py):
import random
import unittest
from itertools import chain
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase
class KafkaStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils.setup()
def tearDown(self):
if self._kafkaTestUtils is not None:
self._kafkaTestUtils.teardown()
self._kafkaTestUtils = None
super(KafkaStreamTests, self).tearDown()
def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)
def _validateStreamResult(self, sendData, stream):
result = {}
for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
sum(sendData.values()))):
result[i] = result.get(i, 0) + 1
self.assertEqual(sendData, result)
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
self._validateStreamResult(sendData, stream)
Flume、Kinesis 和 pyspark.streaming.tests
模块中的其他示例。
下面是 Python 中针对 Kafka 相关功能的自动化测试示例:https://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py
它使用 http://mockintosh.io 项目的“Kafka Mock”功能。
免责声明:我隶属于该项目。
我们有一个消息调度程序,它会根据消息属性生成散列键,然后将其放入带有键的 Kafka 主题队列中。
这样做是为了消除重复数据。但是,我不确定如何在不实际设置本地集群并检查其是否按预期执行的情况下测试此重复数据删除。
在线搜索模拟 Kafka 主题队列的工具没有帮助,我担心我可能以错误的方式思考这个问题。
最终,无论用于模拟 Kafka 队列的是什么,都应该以与本地集群相同的方式运行——即通过向主题队列插入 Key 来提供重复数据删除。
有没有这样的工具?
如果您需要验证 Kafka 特定功能或使用 Kafka 特定功能的实现,那么唯一的方法就是使用 Kafka!
Kafka 是否对其重复数据删除逻辑进行了任何测试?如果是这样,以下组合可能足以减轻您的组织的失败风险:
- 散列逻辑的单元测试(确保同一个对象确实生成相同的散列)
- Kafka 主题重复数据删除测试(Kafka 项目内部)
- 飞行前冒烟测试验证您的应用程序与 Kafka 的集成
如果 Kafka 没有围绕其主题重复数据删除进行任何类型的测试,或者您担心破坏性更改,那么对 Kafka 特定功能进行自动检查很重要。这可以通过集成测试来完成。我最近在基于 Docker 的集成测试管道方面取得了很大的成功。在创建 Kafka docker 图像(一个可能已经从社区中获得)的初步工作之后,设置集成测试管道变得微不足道。管道可能如下所示:
- 执行基于应用程序的单元测试(哈希逻辑)
- 一旦通过,您的 CI 服务器将启动 Kafka
- 执行集成测试,验证重复写入是否只向主题发送一条消息。
我认为重要的是确保 Kafka 集成测试最小化,只包括绝对依赖 Kafka 特定功能的测试。即使使用 docker-compose,它们也可能比单元测试慢几个数量级,~1 毫秒对 1 秒?另一件需要考虑的事情是维护集成管道的开销可能值得相信 Kakfa 将提供它声称的主题重复数据删除的风险。
为了在 Python 单元测试下模拟 Kafka with SBT 测试任务我做了如下。 应该安装 Pyspark。
在 build.sbt 中定义应该 运行 的测试任务:
val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")
val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")
testPythonTask := {
val s: TaskStreams = streams.value
s.log.info("Executing task testPython")
Process(command,
workingDirectory,
// arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
"PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
// collect all jar paths from project
.format((fullClasspath in Runtime value)
.map(_.data.getCanonicalPath)
.filter(_.contains(".jar"))
.mkString(",")),
"PYSPARK_PYTHON" -> "python3") ! s.log
}
//attach custom test task to default test tasks
test in Test := {
testPythonTask.value
(test in Test).value
}
testOnly in Test := {
testPythonTask.value
(testOnly in Test).value
}
在python测试用例 (app_test.py):
import random
import unittest
from itertools import chain
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase
class KafkaStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds
duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils.setup()
def tearDown(self):
if self._kafkaTestUtils is not None:
self._kafkaTestUtils.teardown()
self._kafkaTestUtils = None
super(KafkaStreamTests, self).tearDown()
def _randomTopic(self):
return "topic-%d" % random.randint(0, 10000)
def _validateStreamResult(self, sendData, stream):
result = {}
for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
sum(sendData.values()))):
result[i] = result.get(i, 0) + 1
self.assertEqual(sendData, result)
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
self._validateStreamResult(sendData, stream)
Flume、Kinesis 和 pyspark.streaming.tests
模块中的其他示例。
下面是 Python 中针对 Kafka 相关功能的自动化测试示例:https://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py
它使用 http://mockintosh.io 项目的“Kafka Mock”功能。
免责声明:我隶属于该项目。