kafka 流异常无法为 org.apache.kafka.common.serialization.Serdes$WrapperSerde 找到 public 无参数构造函数

kafka streams exception Could not find a public no-argument constructor for org.apache.kafka.common.serialization.Serdes$WrapperSerde

在使用 kafka 流时得到以下错误堆栈跟踪

更新:根据@matthias-j-sax,已经使用WrapperSerde的默认构造函数实现了我自己的Serdes,但仍然出现以下异常

org.apache.kafka.streams.errors.StreamsException: stream-thread [streams-request-count-4c239508-6abe-4901-bd56-d53987494770-StreamThread-1] Failed to rebalance.
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:836)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class myapps.serializer.Serdes$WrapperSerde
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:972)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: java.lang.NullPointerException
    at myapps.serializer.Serdes$WrapperSerde.configure (Serdes.java:30)
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:968)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
    at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
    at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
    at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)

这是我的用例:

我将得到 json 响应作为流的输入,我想计算状态代码不是 200 的请求。最初,我浏览了官方文档中关于 kafka 流的文档以及 confluent ,然后实现了 WordCountDemo ,它工作得很好,然后我尝试编写这段代码,但是遇到了这个异常,我对 kafka 流很陌生,我浏览了堆栈跟踪,但无法理解上下文,所以才来求助!!!

这是我的代码

LogCount.java

package myapps;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import myapps.serializer.JsonDeserializer;
import myapps.serializer.JsonSerializer;
import myapps.Request;


public class LogCount {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-request-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        JsonSerializer<Request> requestJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Request> requestJsonDeserializer = new JsonDeserializer<>(Request.class);
        Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, Request> source = builder.stream("streams-requests-input");
        source.filter((k, v) -> v.getHttpStatusCode() != 200)
                .groupByKey()
                .count()
                .toStream()
                .to("streams-requests-output", Produced.with(Serdes.String(), Serdes.Long()));
        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        System.out.println(topology.describe());
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.cleanUp();
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

JsonDeserializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
         if(bytes == null){
             return null;
         }

         return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}

JsonSerializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.Charset;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
}

正如我提到的,我将得到 JSON 作为输入,结构是这样的,

{
"RequestID":"1f6b2409", "Protocol":"http", "Host":"abc.com", "Method":"GET", "HTTPStatusCode":"200", "User-Agent":"curl%2f7.54.0", }

对应的Request.java文件如下所示

package myapps;

public final class Request {
    private String requestID;
    private String protocol;
    private String host;
    private String method;
    private int httpStatusCode;
    private String userAgent;

    public String getRequestID() {
        return requestID;
    }
    public void setRequestID(String requestID) {
        this.requestID = requestID;
    }
    public String getProtocol() {
        return protocol;
    }
    public void setProtocol(String protocol) {
        this.protocol = protocol;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public String getMethod() {
        return method;
    }
    public void setMethod(String method) {
        this.method = method;
    }
    public int getHttpStatusCode() {
        return httpStatusCode;
    }
    public void setHttpStatusCode(int httpStatusCode) {
        this.httpStatusCode = httpStatusCode;
    }
    public String getUserAgent() {
        return userAgent;
    }
    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }
}

编辑:当我退出 kafka-console-consumer.sh 时,显示的是 Processed a total of 0 messages

如错误所示,class 缺少 Serdes$WrapperSerde 的非参数默认构造函数:

Could not find a public no-argument constructor 

问题是这个构造:

Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());

Serdes.serdeFrom return WrapperSerde 没有空的默认构造函数。因此,您不能将其传递到 StreamsConfig。仅当您将对象传递到相应的 API 调用中时,您才能像这样使用 Serdes 生成(即,覆盖某些运算符的默认值 Serde)。

要使其工作(即,能够在配置中设置 Serde),您需要实现一个适当的 class 来实现 Serde 接口。

requestSerde.getClass().getName() 对我不起作用。我需要在内部 class 中提供我自己的 WrapperSerde 实现。你可能需要对类似的东西做同样的事情:

public class MySerde extends WrapperSerde<Request> {
    public MySerde () {
        super(requestJsonSerializer, requestJsonDeserializer);
    }
}

不在属性中指定,而是在流创建中添加自定义 serde

 KStream<String, Request> source = builder.stream("streams-requests-input",Consumed.with(Serdes.String(), requestSerde));