如何在 Spark Streaming 应用程序中从 Kafka 接收 Java 对象

How to receive a Java Object from Kafka in Spark Streaming Application

我有很多 class Say Test 对象,我想将其写入 Kafka 并使用 Spark Streaming App 处理它们。我想使用 Kryo 序列化。

我的申请在Java

JavaDStream<Test> testData = KafkaUtils
                .createDirectStream(context , keyClass,valueClass ,keyDecoderClass ,valueDecoderClass , props,topics);

我的问题是我应该为 keyClass,valueClass,keyDecoderClass,valueDecoderClass 添加什么?

假设你的主题是 "String " 并且值是 "Test" 那么首先你需要通过实现 kafka.serializer.Encoder 和 [=13 创建 TestEncoder 和 TestDecoder classes =].现在在你的 createDirectStream 方法中你可以有

JavaPairInputDStream<String, Test> testData = KafkaUtils
            .createDirectStream(context, String.class,Test.class ,StringDecoder.class,TestDecoder.class,props,topics);

您可以参考 KafkaKryoEncoder https://www.tomsdev.com/blog/2015/storm-kafka-complex-types/

在您的 Kafka 生产者中,您需要像

一样注册您的自定义编码器 class
Properties properties = new Properties();
properties.put("metadata.broker.list", brokerList);
properties.put("serializer.class", "com.my.TestEncoder");
Producer<String, Test> producer = new Producer<String, Test>(new ProducerConfig(properties));
Test test = new Test();
KeyedMessage<String, Test> data = new KeyedMessage<String, Test>("myTopic", test);
producer.send(data);