如何在 Spark Streaming 应用程序中从 Kafka 接收 Java 对象
How to receive a Java Object from Kafka in Spark Streaming Application
我有很多 class Say Test 对象,我想将其写入 Kafka 并使用 Spark Streaming App 处理它们。我想使用 Kryo 序列化。
我的申请在Java
JavaDStream<Test> testData = KafkaUtils
.createDirectStream(context , keyClass,valueClass ,keyDecoderClass ,valueDecoderClass , props,topics);
我的问题是我应该为 keyClass,valueClass,keyDecoderClass,valueDecoderClass 添加什么?
假设你的主题是 "String " 并且值是 "Test" 那么首先你需要通过实现 kafka.serializer.Encoder
和 [=13 创建 TestEncoder 和 TestDecoder classes =].现在在你的 createDirectStream 方法中你可以有
JavaPairInputDStream<String, Test> testData = KafkaUtils
.createDirectStream(context, String.class,Test.class ,StringDecoder.class,TestDecoder.class,props,topics);
您可以参考 KafkaKryoEncoder
https://www.tomsdev.com/blog/2015/storm-kafka-complex-types/
在您的 Kafka 生产者中,您需要像
一样注册您的自定义编码器 class
Properties properties = new Properties();
properties.put("metadata.broker.list", brokerList);
properties.put("serializer.class", "com.my.TestEncoder");
Producer<String, Test> producer = new Producer<String, Test>(new ProducerConfig(properties));
Test test = new Test();
KeyedMessage<String, Test> data = new KeyedMessage<String, Test>("myTopic", test);
producer.send(data);
我有很多 class Say Test 对象,我想将其写入 Kafka 并使用 Spark Streaming App 处理它们。我想使用 Kryo 序列化。
我的申请在Java
JavaDStream<Test> testData = KafkaUtils
.createDirectStream(context , keyClass,valueClass ,keyDecoderClass ,valueDecoderClass , props,topics);
我的问题是我应该为 keyClass,valueClass,keyDecoderClass,valueDecoderClass 添加什么?
假设你的主题是 "String " 并且值是 "Test" 那么首先你需要通过实现 kafka.serializer.Encoder
和 [=13 创建 TestEncoder 和 TestDecoder classes =].现在在你的 createDirectStream 方法中你可以有
JavaPairInputDStream<String, Test> testData = KafkaUtils
.createDirectStream(context, String.class,Test.class ,StringDecoder.class,TestDecoder.class,props,topics);
您可以参考 KafkaKryoEncoder
https://www.tomsdev.com/blog/2015/storm-kafka-complex-types/
在您的 Kafka 生产者中,您需要像
一样注册您的自定义编码器 classProperties properties = new Properties();
properties.put("metadata.broker.list", brokerList);
properties.put("serializer.class", "com.my.TestEncoder");
Producer<String, Test> producer = new Producer<String, Test>(new ProducerConfig(properties));
Test test = new Test();
KeyedMessage<String, Test> data = new KeyedMessage<String, Test>("myTopic", test);
producer.send(data);