EmbeddedKafka 自 Spring Boot 2.6.X 失败:AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*
EmbeddedKafka failing since Spring Boot 2.6.X : AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*
e:这已通过 Spring Boot 2.6.5 修复(参见 https://github.com/spring-projects/spring-boot/issues/30243)
自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1),我有多个项目现在在 Windows 上的单元测试失败,无法开始 EmbeddedKafka
,用 Linux
做 运行
有多个错误,但这是第一个抛出的错误
...
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.1)
2021-12-09 16:15:00.300 INFO 13864 --- [ main] k.utils.Log4jControllerRegistration$ : Registered kafka:type=kafka.Log4jController MBean
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : ______ _
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |___ / | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / ___ ___ | | __ ___ ___ _ __ ___ _ __
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |_|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:host.name=host.docker.internal
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.version=11.0.11
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.vendor=AdoptOpenJDK
...
2021-12-09 16:15:01.015 INFO 13864 --- [nelReaper-Fetch] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Fetch]: Starting
2021-12-09 16:15:01.015 INFO 13864 --- [lReaper-Produce] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Produce]: Starting
2021-12-09 16:15:01.016 INFO 13864 --- [lReaper-Request] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Request]: Starting
2021-12-09 16:15:01.017 INFO 13864 --- [trollerMutation] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-ControllerMutation]: Starting
2021-12-09 16:15:01.037 INFO 13864 --- [ main] kafka.log.LogManager : Loading logs from log dirs ArraySeq(C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446)
2021-12-09 16:15:01.040 INFO 13864 --- [ main] kafka.log.LogManager : Attempting recovery for all logs in C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446 since no clean shutdown file was found
2021-12-09 16:15:01.043 INFO 13864 --- [ main] kafka.log.LogManager : Loaded 0 logs in 6ms.
2021-12-09 16:15:01.043 INFO 13864 --- [ main] kafka.log.LogManager : Starting log cleanup with a period of 300000 ms.
2021-12-09 16:15:01.045 INFO 13864 --- [ main] kafka.log.LogManager : Starting log flusher with a default period of 9223372036854775807 ms.
2021-12-09 16:15:01.052 INFO 13864 --- [ main] kafka.log.LogCleaner : Starting the log cleaner
2021-12-09 16:15:01.059 INFO 13864 --- [leaner-thread-0] kafka.log.LogCleaner : [kafka-log-cleaner-thread-0]: Starting
2021-12-09 16:15:01.224 INFO 13864 --- [name=forwarding] k.s.BrokerToControllerRequestThread : [BrokerToControllerChannelManager broker=0 name=forwarding]: Starting
2021-12-09 16:15:01.325 INFO 13864 --- [ main] kafka.network.ConnectionQuotas : Updated connection-accept-rate max connection creation rate to 2147483647
2021-12-09 16:15:01.327 INFO 13864 --- [ main] kafka.network.Acceptor : Awaiting socket connections on localhost:63919.
2021-12-09 16:15:01.345 INFO 13864 --- [ main] kafka.network.SocketServer : [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)
2021-12-09 16:15:01.350 INFO 13864 --- [0 name=alterIsr] k.s.BrokerToControllerRequestThread : [BrokerToControllerChannelManager broker=0 name=alterIsr]: Starting
2021-12-09 16:15:01.364 INFO 13864 --- [eaper-0-Produce] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Produce]: Starting
2021-12-09 16:15:01.364 INFO 13864 --- [nReaper-0-Fetch] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Fetch]: Starting
2021-12-09 16:15:01.365 INFO 13864 --- [0-DeleteRecords] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-DeleteRecords]: Starting
2021-12-09 16:15:01.365 INFO 13864 --- [r-0-ElectLeader] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-ElectLeader]: Starting
2021-12-09 16:15:01.374 INFO 13864 --- [rFailureHandler] k.s.ReplicaManager$LogDirFailureHandler : [LogDirFailureHandler]: Starting
2021-12-09 16:15:01.390 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Creating /brokers/ids/0 (is it secure? false)
2021-12-09 16:15:01.400 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Stat of the created znode at /brokers/ids/0 is: 25,25,1639062901396,1639062901396,1,0,0,72059919267528704,204,0,25
2021-12-09 16:15:01.400 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:63919, czxid (broker epoch): 25
2021-12-09 16:15:01.410 ERROR 13864 --- [ main] kafka.server.BrokerMetadataCheckpoint : Failed to write meta.properties due to
java.nio.file.AccessDeniedException: C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) ~[na:na]
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]
可通过 spring Initializr + 添加“Spring Kafka”重现:https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka
然后执行以下测试-class:
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka
class ApplicationTest {
@Test
void run() {
int i = 1 + 1; // just a line of code to set a debug-point
}
}
在 pom.xml 的属性中将 kafka.version
固定到 2.8.1 时,我没有出现此错误。
似乎是 Kafka 本身的原因,但我很难确定是 spring-kafka 错误地通过 EmbeddedKafka 初始化了 Kafka,还是 Kafka 本身就是罪魁祸首。
有人有想法吗?我是否缺少要设置的测试参数?
Apache Kafka 端的已知错误。从 Spring 的角度来看无事可做。
在此处查看更多信息:https://github.com/spring-projects/spring-kafka/discussions/2027。
在这里:https://issues.apache.org/jira/browse/KAFKA-13391
您需要等到 Apache Kafka 3.0.1
或不使用嵌入式 Kafka 而仅依赖于 Testcontainer,例如,或完全外部的 Apache Kafka 代理。
虽然我会等到 kafka 3.0.1
发布,对于那些只想切换到 Testcontainers 但不熟悉如何设置它们的人:
可运行的应用程序
package com.example.demo;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
import java.util.stream.IntStream;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@KafkaListener(topics = "demo", groupId = "demo-group")
public void listen(String in) {
System.out.println("Processing: " + in);
}
@Bean
public NewTopic topic() {
return new NewTopic("demo", 5, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
String event = "foo" + i;
System.out.println("Sending " + event);
template.send("demo", i + "", event);
}
);
};
}
}
带有测试容器的测试代码,其中 Kafka 将在 Docker
中启动
package com.example.demo;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
@SpringBootTest
class DemoApplicationTest {
@Autowired
ApplicationRunner applicationRunner;
@Container
public static KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setUp() {
kafkaContainer.start();
}
@DynamicPropertySource
static void addDynamicProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
@Test
void run() throws Exception {
applicationRunner.run(null);
}
}
您的 pom.xml
的必要补充
<dependencies>
...
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.16.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
为 windows 环境确定 kafka 2.8.1 的另一种方法。
这假设生成用于生产用途的 jar 的构建环境不是 windows 框
加入pom.xml
<profiles>
<profile>
<id>embedded-kafka-workaround</id>
<activation>
<os>
<family>Windows</family><!-- super hacky workaround for . "if os = windows" condition until kafka 3.0.1 or 3.1.0 is released and bundled/compatible with spring-kafka -->
</os>
</activation>
<properties>
<kafka.version>2.8.1</kafka.version><!-- only locally and when in windows, kafka 3.0.0 fails to start embedded kafka -->
</properties>
</profile>
</profiles>
作为解决方法,将修补后的 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 添加到您的项目测试源(在同一包下),直到 Kafka 3.0.1 附带 Spring Boot。 - 当然,当发生这种情况时,请删除这个临时 class。
另一种方法是改用 TestContainers Kafka。这至少会给你一个孤立的 Kafka 实例,它比 @EmbeddedKafka
更接近你在生产环境中拥有的实例
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.16.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
在你的代码中
@Testcontainers
class MyTest {
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("docker-proxy.devhaus.com/confluentinc/cp-kafka:5.4.3").asCompatibleSubstituteFor("confluentinc/cp-kafka"))
.withReuse(true);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}
...
我可以通过在 pom 文件中将 kafka.version 属性 添加到 3.1.0 来解决问题
<properties>
<kafka.version>3.1.0</kafka.version>
</properties>
一旦 spring-boot-starter-parent:2.6.5 可用,如果该版本可能使用 kafka-client 3.1.0
,您可以删除它
for spring boot version 2.6.X 添加依赖项(gradle):
implementation 'org.apache.kafka:kafka-clients:3.0.1'
一旦spring boot 升级了spring boot package 中的库
就删除它
e:这已通过 Spring Boot 2.6.5 修复(参见 https://github.com/spring-projects/spring-boot/issues/30243)
自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1),我有多个项目现在在 Windows 上的单元测试失败,无法开始 EmbeddedKafka
,用 Linux
有多个错误,但这是第一个抛出的错误
...
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.1)
2021-12-09 16:15:00.300 INFO 13864 --- [ main] k.utils.Log4jControllerRegistration$ : Registered kafka:type=kafka.Log4jController MBean
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : ______ _
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |___ / | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / ___ ___ | | __ ___ ___ _ __ ___ _ __
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : | |
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : |_|
2021-12-09 16:15:00.420 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer :
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:host.name=host.docker.internal
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.version=11.0.11
2021-12-09 16:15:00.422 INFO 13864 --- [ main] o.a.zookeeper.server.ZooKeeperServer : Server environment:java.vendor=AdoptOpenJDK
...
2021-12-09 16:15:01.015 INFO 13864 --- [nelReaper-Fetch] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Fetch]: Starting
2021-12-09 16:15:01.015 INFO 13864 --- [lReaper-Produce] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Produce]: Starting
2021-12-09 16:15:01.016 INFO 13864 --- [lReaper-Request] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Request]: Starting
2021-12-09 16:15:01.017 INFO 13864 --- [trollerMutation] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-ControllerMutation]: Starting
2021-12-09 16:15:01.037 INFO 13864 --- [ main] kafka.log.LogManager : Loading logs from log dirs ArraySeq(C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446)
2021-12-09 16:15:01.040 INFO 13864 --- [ main] kafka.log.LogManager : Attempting recovery for all logs in C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446 since no clean shutdown file was found
2021-12-09 16:15:01.043 INFO 13864 --- [ main] kafka.log.LogManager : Loaded 0 logs in 6ms.
2021-12-09 16:15:01.043 INFO 13864 --- [ main] kafka.log.LogManager : Starting log cleanup with a period of 300000 ms.
2021-12-09 16:15:01.045 INFO 13864 --- [ main] kafka.log.LogManager : Starting log flusher with a default period of 9223372036854775807 ms.
2021-12-09 16:15:01.052 INFO 13864 --- [ main] kafka.log.LogCleaner : Starting the log cleaner
2021-12-09 16:15:01.059 INFO 13864 --- [leaner-thread-0] kafka.log.LogCleaner : [kafka-log-cleaner-thread-0]: Starting
2021-12-09 16:15:01.224 INFO 13864 --- [name=forwarding] k.s.BrokerToControllerRequestThread : [BrokerToControllerChannelManager broker=0 name=forwarding]: Starting
2021-12-09 16:15:01.325 INFO 13864 --- [ main] kafka.network.ConnectionQuotas : Updated connection-accept-rate max connection creation rate to 2147483647
2021-12-09 16:15:01.327 INFO 13864 --- [ main] kafka.network.Acceptor : Awaiting socket connections on localhost:63919.
2021-12-09 16:15:01.345 INFO 13864 --- [ main] kafka.network.SocketServer : [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)
2021-12-09 16:15:01.350 INFO 13864 --- [0 name=alterIsr] k.s.BrokerToControllerRequestThread : [BrokerToControllerChannelManager broker=0 name=alterIsr]: Starting
2021-12-09 16:15:01.364 INFO 13864 --- [eaper-0-Produce] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Produce]: Starting
2021-12-09 16:15:01.364 INFO 13864 --- [nReaper-0-Fetch] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Fetch]: Starting
2021-12-09 16:15:01.365 INFO 13864 --- [0-DeleteRecords] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-DeleteRecords]: Starting
2021-12-09 16:15:01.365 INFO 13864 --- [r-0-ElectLeader] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-ElectLeader]: Starting
2021-12-09 16:15:01.374 INFO 13864 --- [rFailureHandler] k.s.ReplicaManager$LogDirFailureHandler : [LogDirFailureHandler]: Starting
2021-12-09 16:15:01.390 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Creating /brokers/ids/0 (is it secure? false)
2021-12-09 16:15:01.400 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Stat of the created znode at /brokers/ids/0 is: 25,25,1639062901396,1639062901396,1,0,0,72059919267528704,204,0,25
2021-12-09 16:15:01.400 INFO 13864 --- [ main] kafka.zk.KafkaZkClient : Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:63919, czxid (broker epoch): 25
2021-12-09 16:15:01.410 ERROR 13864 --- [ main] kafka.server.BrokerMetadataCheckpoint : Failed to write meta.properties due to
java.nio.file.AccessDeniedException: C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) ~[na:na]
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]
可通过 spring Initializr + 添加“Spring Kafka”重现:https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka
然后执行以下测试-class:
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka
class ApplicationTest {
@Test
void run() {
int i = 1 + 1; // just a line of code to set a debug-point
}
}
在 pom.xml 的属性中将 kafka.version
固定到 2.8.1 时,我没有出现此错误。
似乎是 Kafka 本身的原因,但我很难确定是 spring-kafka 错误地通过 EmbeddedKafka 初始化了 Kafka,还是 Kafka 本身就是罪魁祸首。
有人有想法吗?我是否缺少要设置的测试参数?
Apache Kafka 端的已知错误。从 Spring 的角度来看无事可做。 在此处查看更多信息:https://github.com/spring-projects/spring-kafka/discussions/2027。 在这里:https://issues.apache.org/jira/browse/KAFKA-13391
您需要等到 Apache Kafka 3.0.1
或不使用嵌入式 Kafka 而仅依赖于 Testcontainer,例如,或完全外部的 Apache Kafka 代理。
虽然我会等到 kafka 3.0.1
发布,对于那些只想切换到 Testcontainers 但不熟悉如何设置它们的人:
可运行的应用程序
package com.example.demo;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.LocalDateTime;
import java.util.stream.IntStream;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@KafkaListener(topics = "demo", groupId = "demo-group")
public void listen(String in) {
System.out.println("Processing: " + in);
}
@Bean
public NewTopic topic() {
return new NewTopic("demo", 5, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> {
String event = "foo" + i;
System.out.println("Sending " + event);
template.send("demo", i + "", event);
}
);
};
}
}
带有测试容器的测试代码,其中 Kafka 将在 Docker
中启动package com.example.demo;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
@SpringBootTest
class DemoApplicationTest {
@Autowired
ApplicationRunner applicationRunner;
@Container
public static KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
@BeforeAll
static void setUp() {
kafkaContainer.start();
}
@DynamicPropertySource
static void addDynamicProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
@Test
void run() throws Exception {
applicationRunner.run(null);
}
}
您的 pom.xml
的必要补充 <dependencies>
...
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
...
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.16.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
为 windows 环境确定 kafka 2.8.1 的另一种方法。
这假设生成用于生产用途的 jar 的构建环境不是 windows 框
加入pom.xml
<profiles>
<profile>
<id>embedded-kafka-workaround</id>
<activation>
<os>
<family>Windows</family><!-- super hacky workaround for . "if os = windows" condition until kafka 3.0.1 or 3.1.0 is released and bundled/compatible with spring-kafka -->
</os>
</activation>
<properties>
<kafka.version>2.8.1</kafka.version><!-- only locally and when in windows, kafka 3.0.0 fails to start embedded kafka -->
</properties>
</profile>
</profiles>
作为解决方法,将修补后的 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 添加到您的项目测试源(在同一包下),直到 Kafka 3.0.1 附带 Spring Boot。 - 当然,当发生这种情况时,请删除这个临时 class。
另一种方法是改用 TestContainers Kafka。这至少会给你一个孤立的 Kafka 实例,它比 @EmbeddedKafka
更接近你在生产环境中拥有的实例<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>1.16.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
在你的代码中
@Testcontainers
class MyTest {
@Container
private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("docker-proxy.devhaus.com/confluentinc/cp-kafka:5.4.3").asCompatibleSubstituteFor("confluentinc/cp-kafka"))
.withReuse(true);
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}
...
我可以通过在 pom 文件中将 kafka.version 属性 添加到 3.1.0 来解决问题
<properties>
<kafka.version>3.1.0</kafka.version>
</properties>
一旦 spring-boot-starter-parent:2.6.5 可用,如果该版本可能使用 kafka-client 3.1.0
,您可以删除它for spring boot version 2.6.X 添加依赖项(gradle):
implementation 'org.apache.kafka:kafka-clients:3.0.1'
一旦spring boot 升级了spring boot package 中的库
就删除它