Kafka Streams:流线程无法锁定状态目录
Kafka Streams : Stream Thread failed to lock State Directory
我正在尝试测试我的 Kafka Streams 应用程序。我构建了一个简单的拓扑,我从输入主题中读取并将相同的数据存储在状态存储中。
我尝试使用 TopologyTestDriver 为这个拓扑编写单元测试。当我 运行 测试时,我遇到了以下错误。
org.apache.kafka.streams.errors.LockException: stream-thread [main] task [0_0] Failed to lock the state directory for task 0_0
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:403)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:257)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:228)
at streams.checkStreams.checkStreamsTest.setup(checkStreamsTest.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access[=10=]0(ParentRunner.java:58)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
....
我可以看到状态存储在 /tmp/kafka-streams 中本地创建,但不知何故流线程无法锁定它。我搜索了一下,发现这个错误可能是因为有两个流线程试图访问它,一个有锁,所以另一个必须等待。但是我没有看到在我的代码中创建了两个流线程。我是这个 kafka 流及其测试的新手,我在这里遗漏了什么吗?
TopologyTestDriver
不会创建任何后台线程,因此多线程(来自 KafkaStreams 本身)应该不是问题——但是,正如@BartoszWardziński 指出的那样,如果您的测试框架并行执行测试,并且您在不同的测试中使用相同的 application.id
,这可能会导致锁定问题。
测试的建议是,生成随机 application.id
以避免此问题。
如果您的测试不是 运行 并行进行,解决方案可能是在 TopologyTestDriver
上调用 close()
方法。这将清理资源并移除锁。无论如何,这可能是一次性物品的最佳做法。
如果 运行ning 并行测试,您可以设置随机 application.id
。这样做的问题是,如果您使用模式注册表并连接到测试注册表,这可能会创建数千个方案(每个测试一个)。
您的两个选择是:
- 每个测试有一个唯一的
application.id
但它是硬编码的(即名称
测试)而不是随机的。
- 不要 运行 并行测试并调用
close()
TopologyTestDriver
我正在尝试测试我的 Kafka Streams 应用程序。我构建了一个简单的拓扑,我从输入主题中读取并将相同的数据存储在状态存储中。
我尝试使用 TopologyTestDriver 为这个拓扑编写单元测试。当我 运行 测试时,我遇到了以下错误。
org.apache.kafka.streams.errors.LockException: stream-thread [main] task [0_0] Failed to lock the state directory for task 0_0
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:403)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:257)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:228)
at streams.checkStreams.checkStreamsTest.setup(checkStreamsTest.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access[=10=]0(ParentRunner.java:58)
at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
....
我可以看到状态存储在 /tmp/kafka-streams 中本地创建,但不知何故流线程无法锁定它。我搜索了一下,发现这个错误可能是因为有两个流线程试图访问它,一个有锁,所以另一个必须等待。但是我没有看到在我的代码中创建了两个流线程。我是这个 kafka 流及其测试的新手,我在这里遗漏了什么吗?
TopologyTestDriver
不会创建任何后台线程,因此多线程(来自 KafkaStreams 本身)应该不是问题——但是,正如@BartoszWardziński 指出的那样,如果您的测试框架并行执行测试,并且您在不同的测试中使用相同的 application.id
,这可能会导致锁定问题。
测试的建议是,生成随机 application.id
以避免此问题。
如果您的测试不是 运行 并行进行,解决方案可能是在 TopologyTestDriver
上调用 close()
方法。这将清理资源并移除锁。无论如何,这可能是一次性物品的最佳做法。
如果 运行ning 并行测试,您可以设置随机 application.id
。这样做的问题是,如果您使用模式注册表并连接到测试注册表,这可能会创建数千个方案(每个测试一个)。
您的两个选择是:
- 每个测试有一个唯一的
application.id
但它是硬编码的(即名称 测试)而不是随机的。 - 不要 运行 并行测试并调用
close()
TopologyTestDriver