EmbeddedKafka 自 Spring Boot 2.6.X 失败:AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*

EmbeddedKafka failing since Spring Boot 2.6.X : AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*

e:这已通过 Spring Boot 2.6.5 修复(参见 https://github.com/spring-projects/spring-boot/issues/30243

自从升级到 Spring Boot 2.6.X(在我的例子中:2.6.1),我有多个项目现在在 Windows 上的单元测试失败,无法开始 EmbeddedKafka,用 Linux

做 运行

有多个错误,但这是第一个抛出的错误

...
  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.1)

2021-12-09 16:15:00.300  INFO 13864 --- [           main] k.utils.Log4jControllerRegistration$     : Registered kafka:type=kafka.Log4jController MBean
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   ______                  _                                          
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  |___  /                 | |                                         
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __   
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |    
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               | |                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               |_|                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:host.name=host.docker.internal
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.version=11.0.11
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.vendor=AdoptOpenJDK
...
2021-12-09 16:15:01.015  INFO 13864 --- [nelReaper-Fetch] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Fetch]: Starting
2021-12-09 16:15:01.015  INFO 13864 --- [lReaper-Produce] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Produce]: Starting
2021-12-09 16:15:01.016  INFO 13864 --- [lReaper-Request] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Request]: Starting
2021-12-09 16:15:01.017  INFO 13864 --- [trollerMutation] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-ControllerMutation]: Starting
2021-12-09 16:15:01.037  INFO 13864 --- [           main] kafka.log.LogManager                     : Loading logs from log dirs ArraySeq(C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446)
2021-12-09 16:15:01.040  INFO 13864 --- [           main] kafka.log.LogManager                     : Attempting recovery for all logs in C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446 since no clean shutdown file was found
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Loaded 0 logs in 6ms.
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log cleanup with a period of 300000 ms.
2021-12-09 16:15:01.045  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log flusher with a default period of 9223372036854775807 ms.
2021-12-09 16:15:01.052  INFO 13864 --- [           main] kafka.log.LogCleaner                     : Starting the log cleaner
2021-12-09 16:15:01.059  INFO 13864 --- [leaner-thread-0] kafka.log.LogCleaner                     : [kafka-log-cleaner-thread-0]: Starting
2021-12-09 16:15:01.224  INFO 13864 --- [name=forwarding] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=forwarding]: Starting
2021-12-09 16:15:01.325  INFO 13864 --- [           main] kafka.network.ConnectionQuotas           : Updated connection-accept-rate max connection creation rate to 2147483647
2021-12-09 16:15:01.327  INFO 13864 --- [           main] kafka.network.Acceptor                   : Awaiting socket connections on localhost:63919.
2021-12-09 16:15:01.345  INFO 13864 --- [           main] kafka.network.SocketServer               : [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)
2021-12-09 16:15:01.350  INFO 13864 --- [0 name=alterIsr] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=alterIsr]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [eaper-0-Produce] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Produce]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [nReaper-0-Fetch] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Fetch]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [0-DeleteRecords] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-DeleteRecords]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [r-0-ElectLeader] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-ElectLeader]: Starting
2021-12-09 16:15:01.374  INFO 13864 --- [rFailureHandler] k.s.ReplicaManager$LogDirFailureHandler  : [LogDirFailureHandler]: Starting
2021-12-09 16:15:01.390  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Creating /brokers/ids/0 (is it secure? false)
2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Stat of the created znode at /brokers/ids/0 is: 25,25,1639062901396,1639062901396,1,0,0,72059919267528704,204,0,25

2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:63919, czxid (broker epoch): 25
2021-12-09 16:15:01.410 ERROR 13864 --- [           main] kafka.server.BrokerMetadataCheckpoint    : Failed to write meta.properties due to

java.nio.file.AccessDeniedException: C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446
    at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]

可通过 spring Initializr + 添加“Spring Kafka”重现:https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka

然后执行以下测试-class:

package com.example.demo;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka
class ApplicationTest {

    @Test
    void run() {
        int i = 1 + 1; // just a line of code to set a debug-point
    }
}

在 pom.xml 的属性中将 kafka.version 固定到 2.8.1 时,我没有出现此错误。

似乎是 Kafka 本身的原因,但我很难确定是 spring-kafka 错误地通过 EmbeddedKafka 初始化了 Kafka,还是 Kafka 本身就是罪魁祸首。

有人有想法吗?我是否缺少要设置的测试参数?

Apache Kafka 端的已知错误。从 Spring 的角度来看无事可做。 在此处查看更多信息:https://github.com/spring-projects/spring-kafka/discussions/2027。 在这里:https://issues.apache.org/jira/browse/KAFKA-13391

您需要等到 Apache Kafka 3.0.1 或不使用嵌入式 Kafka 而仅依赖于 Testcontainer,例如,或完全外部的 Apache Kafka 代理。

虽然我会等到 kafka 3.0.1 发布,对于那些只想切换到 Testcontainers 但不熟悉如何设置它们的人:

基于此 initlzr 的示例:https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka,testcontainers

可运行的应用程序

package com.example.demo;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import java.time.LocalDateTime;
import java.util.stream.IntStream;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @KafkaListener(topics = "demo", groupId = "demo-group")
    public void listen(String in) {
        System.out.println("Processing: " + in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("demo", 5, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> {
                        String event = "foo" + i;
                        System.out.println("Sending " + event);
                        template.send("demo", i + "", event);

                    }
            );

        };
    }
}

带有测试容器的测试代码,其中 Kafka 将在 Docker

中启动
package com.example.demo;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@SpringBootTest
class DemoApplicationTest {

    @Autowired
    ApplicationRunner applicationRunner;

    @Container
    public static KafkaContainer kafkaContainer =
            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

    @BeforeAll
    static void setUp() {
        kafkaContainer.start();
    }

    @DynamicPropertySource
    static void addDynamicProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
    }


    @Test
    void run() throws Exception {
        applicationRunner.run(null);
    }
}

您的 pom.xml

的必要补充
    <dependencies>
...
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>kafka</artifactId>
            <scope>test</scope>
        </dependency>
...
    </dependencies>


    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>testcontainers-bom</artifactId>
                <version>1.16.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

为 windows 环境确定 kafka 2.8.1 的另一种方法。

这假设生成用于生产用途的 jar 的构建环境不是 windows 框

加入pom.xml

  <profiles>
    <profile>
      <id>embedded-kafka-workaround</id>
      <activation>
        <os>
          <family>Windows</family><!-- super hacky workaround for  . "if os = windows" condition until kafka 3.0.1 or 3.1.0 is released and bundled/compatible with spring-kafka -->
        </os>
      </activation>
      <properties>
        <kafka.version>2.8.1</kafka.version><!-- only locally and when in windows, kafka 3.0.0 fails to start embedded kafka -->
      </properties>
    </profile>
  </profiles>

作为解决方法,将修补后的 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 添加到您的项目测试源(在同一包下),直到 Kafka 3.0.1 附带 Spring Boot。 - 当然,当发生这种情况时,请删除这个临时 class。

另一种方法是改用 TestContainers Kafka。这至少会给你一个孤立的 Kafka 实例,它比 @EmbeddedKafka

更接近你在生产环境中拥有的实例
<dependencyManagement>
  <dependencies>  
     <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers-bom</artifactId>
        <version>1.16.2</version>
        <type>pom</type>
        <scope>import</scope>
     </dependency>
  </dependencies>  
</dependencyManagement>

<dependencies>  
    <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>testcontainers</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>kafka</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>junit-jupiter</artifactId>
      <scope>test</scope>
    </dependency>
</dependencies>  

在你的代码中

@Testcontainers
class MyTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("docker-proxy.devhaus.com/confluentinc/cp-kafka:5.4.3").asCompatibleSubstituteFor("confluentinc/cp-kafka"))
        .withReuse(true);

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);

    }
...

我可以通过在 pom 文件中将 kafka.version 属性 添加到 3.1.0 来解决问题

<properties>
    <kafka.version>3.1.0</kafka.version>
</properties>

一旦 spring-boot-starter-parent:2.6.5 可用,如果该版本可能使用 kafka-client 3.1.0

,您可以删除它

for spring boot version 2.6.X 添加依赖项(gradle):

implementation 'org.apache.kafka:kafka-clients:3.0.1'

一旦spring boot 升级了spring boot package 中的库

就删除它