Kafka 流连接两个特定的 Avro 对象
Kafka streams joins two specific Avro objects
基本任务
我在 Avro 格式的 Kafka 中有 2 个相同的流。我正在尝试对这两个流进行基本的左连接。
按键
对于这两个主题中的键,我使用四舍五入到毫秒的时间戳,因为两个流都有来自 IoT 设备的数据,该设备每 20 毫秒生成一次测量,并且两个设备现在都同步到 UTC 时间。
到目前为止完成
我已经能够生成 Kafka 流,该流将一个流转换为主题 this tutorial, but unfortunately basic stream-stream join Confluent 开发人员页面上不存在教程。
Avro Java 序列化 classes
我根据 2 个输入和输出生成了 3 SpecificAvroSerde
classes。
尽管输入流是相同的,但我已经创建了单独的 schemas/classes 以防将来流有不同的模式。
Avro Java classes 是在构建时产生的 whiteout 问题。
这是输入、输出和加入流的模式:
{
"namespace": "pmu.serialization.avro",
"name": "RawPMU_214",
"type": "record",
"fields": [
{"name": "pmu_id", "type": "int"},
{"name": "time", "type":"string"},
{"name": "time_rounded", "type":"string"},
{"name": "stream_id","type":"int"},
{"name": "stat", "type":"string"},
{"name": "ph_i1_r","type":"float"},
{"name": "ph_i1_j","type":"float"},
{"name": "ph_i2_r","type":"float"},
{"name": "ph_i2_j","type":"float"},
{"name": "ph_i3_r","type":"float"},
{"name": "ph_i3_j","type":"float"},
{"name": "ph_v4_r","type":"float"},
{"name": "ph_v4_j","type":"float"},
{"name": "ph_v5_r","type":"float"},
{"name": "ph_v5_j","type":"float"},
{"name": "ph_v6_r","type":"float"},
{"name": "ph_v6_j","type":"float"},
{"name": "ph_7_r","type":"float"},
{"name": "ph_7_j","type":"float"},
{"name": "ph_8_r","type":"float"},
{"name": "ph_8_j","type":"float"},
{"name": "analog","type":"string"},
{"name": "digital","type":"string"},
{"name": "frequency","type":"float"},
{"name": "rocof","type":"int"},
{"name": "orderCount","type":"int"}
]
}
代码
关键问题是我不知道如何使用值连接器正确实现这部分:
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
我尝试了各种答案,但我还没有真正找到 SpecificAvroSerde
的流-流连接的完整 Java 代码示例。
此时完整代码:
package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class TransformStream_join {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
// Define input PMU topics
final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
final String outputTopic = allProps.getProperty("output.topic.name");
KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
return builder.build();
}
private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
raw_pmu214AvroSerde.configure((Map)allProps, false);
return raw_pmu214AvroSerde;
}
private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
raw_pmu218AvroSerde.configure((Map)allProps, false);
return raw_pmu218AvroSerde;
}
private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
raw_outAvroSerde.configure((Map)allProps, false);
return raw_outAvroSerde;
}
public void createTopics(Properties allProps) {
AdminClient client = AdminClient.create(allProps);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu1"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu2"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
TransformStream ts = new TransformStream();
Properties allProps = ts.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
Topology topology = ts.buildTopology(allProps);
ts.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
编辑
KStream 加入:
自从创建 joiner class
以来,我简化了加入流代码
KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
PMUJoiner class
package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {
public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
return RawPMU_Joined.newBuilder()
// PMU 218
.setTimeRounded1(pmu218Stream.getTimeRounded())
.setOrderCount1(pmu218Stream.getOrderCount())
.setPhI1R1(pmu218Stream.getPhI1R())
.setPhI1J1(pmu218Stream.getPhI1J())
.setPhI2R1(pmu218Stream.getPhI2R())
.setPhI2J1(pmu218Stream.getPhI2J())
.setPhI3R1(pmu218Stream.getPhI3R())
.setPhI3J1(pmu218Stream.getPhI3J())
.setPhV4R1(pmu218Stream.getPhV4R())
.setPhV4J1(pmu218Stream.getPhV4J())
.setPhV5R1(pmu218Stream.getPhV5R())
.setPhV5J1(pmu218Stream.getPhV5J())
.setPhV6R1(pmu218Stream.getPhV6R())
.setPhV6J1(pmu218Stream.getPhV6J())
.setPh7R1(pmu218Stream.getPh7R())
.setPh7J1(pmu218Stream.getPh7J())
.setPh8R1(pmu218Stream.getPh8R())
.setPh8J1(pmu218Stream.getPh8J())
//PMU 214
.setTimeRounded2(pmu214Stream.getTimeRounded())
.setOrderCount2(pmu214Stream.getOrderCount())
.setPhI1R2(pmu214Stream.getPhI1R())
.setPhI1J2(pmu214Stream.getPhI1J())
.setPhI2R2(pmu214Stream.getPhI2R())
.setPhI2J2(pmu214Stream.getPhI2J())
.setPhI3R2(pmu214Stream.getPhI3R())
.setPhI3J2(pmu214Stream.getPhI3J())
.setPhV4R2(pmu214Stream.getPhV4R())
.setPhV4J2(pmu214Stream.getPhV4J())
.setPhV5R2(pmu214Stream.getPhV5R())
.setPhV5J2(pmu214Stream.getPhV5J())
.setPhV6R2(pmu214Stream.getPhV6R())
.setPhV6J2(pmu214Stream.getPhV6J())
.setPh7R2(pmu214Stream.getPh7R())
.setPh7J2(pmu214Stream.getPh7J())
.setPh8R2(pmu214Stream.getPh8R())
.setPh8J2(pmu214Stream.getPh8J())
.build();
}
}
错误
...pmuStream01/src/main/java/io/confluent/developer/JoinPMUStreams.java:46:
error: no suitable method found for
join(org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>,io.confluent.developer.PMUJoiner,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,pmu.serialization.avro.RawPMU_218>)
KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream,
^
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows) is not applicable
(cannot infer type-variable(s) VO,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>)
is not applicable
(cannot infer type-variable(s) VO,VR
(argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>))
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.StreamJoined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>)
is not applicable
(cannot infer type-variable(s) VO,VR
(argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>))
method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>) is
not applicable
(cannot infer type-variable(s) VT,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VT,? extends
VR>,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VT>)
is not applicable
(cannot infer type-variable(s) VT,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<?
super java.lang.String,? super pmu.serialization.avro.RawPMU_214,?
extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>) is not
applicable
(cannot infer type-variable(s) GK,GV,RV
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<?
super java.lang.String,? super pmu.serialization.avro.RawPMU_214,?
extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super GV,? extends
RV>,org.apache.kafka.streams.kstream.Named) is not applicable
(cannot infer type-variable(s) GK,GV,RV
(argument mismatch; org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>
cannot be converted to
org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>))
不知道为什么会这样,因为我相信我已经正确地为所有参数提供了正确的 return 类型。
我建议从这个开始。您可以遵循通用的 Avro 示例。特定记录不需要一个,因为它只是您要返回的一个不同的对象,它不会是一个字符串。
(leftValue, rightValue) -> {
RawPMU_Joined j = new RawPMU_Joined();
j.set...
return j;
}
基本任务
我在 Avro 格式的 Kafka 中有 2 个相同的流。我正在尝试对这两个流进行基本的左连接。
按键
对于这两个主题中的键,我使用四舍五入到毫秒的时间戳,因为两个流都有来自 IoT 设备的数据,该设备每 20 毫秒生成一次测量,并且两个设备现在都同步到 UTC 时间。
到目前为止完成
我已经能够生成 Kafka 流,该流将一个流转换为主题 this tutorial, but unfortunately basic stream-stream join Confluent 开发人员页面上不存在教程。
Avro Java 序列化 classes
我根据 2 个输入和输出生成了 3 SpecificAvroSerde
classes。
尽管输入流是相同的,但我已经创建了单独的 schemas/classes 以防将来流有不同的模式。
Avro Java classes 是在构建时产生的 whiteout 问题。
这是输入、输出和加入流的模式:
{
"namespace": "pmu.serialization.avro",
"name": "RawPMU_214",
"type": "record",
"fields": [
{"name": "pmu_id", "type": "int"},
{"name": "time", "type":"string"},
{"name": "time_rounded", "type":"string"},
{"name": "stream_id","type":"int"},
{"name": "stat", "type":"string"},
{"name": "ph_i1_r","type":"float"},
{"name": "ph_i1_j","type":"float"},
{"name": "ph_i2_r","type":"float"},
{"name": "ph_i2_j","type":"float"},
{"name": "ph_i3_r","type":"float"},
{"name": "ph_i3_j","type":"float"},
{"name": "ph_v4_r","type":"float"},
{"name": "ph_v4_j","type":"float"},
{"name": "ph_v5_r","type":"float"},
{"name": "ph_v5_j","type":"float"},
{"name": "ph_v6_r","type":"float"},
{"name": "ph_v6_j","type":"float"},
{"name": "ph_7_r","type":"float"},
{"name": "ph_7_j","type":"float"},
{"name": "ph_8_r","type":"float"},
{"name": "ph_8_j","type":"float"},
{"name": "analog","type":"string"},
{"name": "digital","type":"string"},
{"name": "frequency","type":"float"},
{"name": "rocof","type":"int"},
{"name": "orderCount","type":"int"}
]
}
代码
关键问题是我不知道如何使用值连接器正确实现这部分:
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
我尝试了各种答案,但我还没有真正找到 SpecificAvroSerde
的流-流连接的完整 Java 代码示例。
此时完整代码:
package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class TransformStream_join {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
// Define input PMU topics
final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
final String outputTopic = allProps.getProperty("output.topic.name");
KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);
KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
return builder.build();
}
private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
raw_pmu214AvroSerde.configure((Map)allProps, false);
return raw_pmu214AvroSerde;
}
private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
raw_pmu218AvroSerde.configure((Map)allProps, false);
return raw_pmu218AvroSerde;
}
private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
raw_outAvroSerde.configure((Map)allProps, false);
return raw_outAvroSerde;
}
public void createTopics(Properties allProps) {
AdminClient client = AdminClient.create(allProps);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu1"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("input.topic.pmu2"),
Integer.parseInt(allProps.getProperty("input.topic.partitions")),
Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
TransformStream ts = new TransformStream();
Properties allProps = ts.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
Topology topology = ts.buildTopology(allProps);
ts.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
编辑
KStream 加入: 自从创建 joiner class
以来,我简化了加入流代码KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
JoinWindows.of(Duration.ofMillis(20)),
Joined.with(
Serdes.String(),
raw_pmu214AvroSerde(allProps),
raw_pmu218AvroSerde(allProps))
);
PMUJoiner class
package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {
public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
return RawPMU_Joined.newBuilder()
// PMU 218
.setTimeRounded1(pmu218Stream.getTimeRounded())
.setOrderCount1(pmu218Stream.getOrderCount())
.setPhI1R1(pmu218Stream.getPhI1R())
.setPhI1J1(pmu218Stream.getPhI1J())
.setPhI2R1(pmu218Stream.getPhI2R())
.setPhI2J1(pmu218Stream.getPhI2J())
.setPhI3R1(pmu218Stream.getPhI3R())
.setPhI3J1(pmu218Stream.getPhI3J())
.setPhV4R1(pmu218Stream.getPhV4R())
.setPhV4J1(pmu218Stream.getPhV4J())
.setPhV5R1(pmu218Stream.getPhV5R())
.setPhV5J1(pmu218Stream.getPhV5J())
.setPhV6R1(pmu218Stream.getPhV6R())
.setPhV6J1(pmu218Stream.getPhV6J())
.setPh7R1(pmu218Stream.getPh7R())
.setPh7J1(pmu218Stream.getPh7J())
.setPh8R1(pmu218Stream.getPh8R())
.setPh8J1(pmu218Stream.getPh8J())
//PMU 214
.setTimeRounded2(pmu214Stream.getTimeRounded())
.setOrderCount2(pmu214Stream.getOrderCount())
.setPhI1R2(pmu214Stream.getPhI1R())
.setPhI1J2(pmu214Stream.getPhI1J())
.setPhI2R2(pmu214Stream.getPhI2R())
.setPhI2J2(pmu214Stream.getPhI2J())
.setPhI3R2(pmu214Stream.getPhI3R())
.setPhI3J2(pmu214Stream.getPhI3J())
.setPhV4R2(pmu214Stream.getPhV4R())
.setPhV4J2(pmu214Stream.getPhV4J())
.setPhV5R2(pmu214Stream.getPhV5R())
.setPhV5J2(pmu214Stream.getPhV5J())
.setPhV6R2(pmu214Stream.getPhV6R())
.setPhV6J2(pmu214Stream.getPhV6J())
.setPh7R2(pmu214Stream.getPh7R())
.setPh7J2(pmu214Stream.getPh7J())
.setPh8R2(pmu214Stream.getPh8R())
.setPh8J2(pmu214Stream.getPh8J())
.build();
}
}
错误
...pmuStream01/src/main/java/io/confluent/developer/JoinPMUStreams.java:46: error: no suitable method found for join(org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>,io.confluent.developer.PMUJoiner,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,pmu.serialization.avro.RawPMU_218>) KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, ^ method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows) is not applicable (cannot infer type-variable(s) VO,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.StreamJoined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VT>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>) is not applicable (cannot infer type-variable(s) GK,GV,RV (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>,org.apache.kafka.streams.kstream.Named) is not applicable (cannot infer type-variable(s) GK,GV,RV (argument mismatch; org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218> cannot be converted to org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>))
不知道为什么会这样,因为我相信我已经正确地为所有参数提供了正确的 return 类型。
我建议从这个开始。您可以遵循通用的 Avro 示例。特定记录不需要一个,因为它只是您要返回的一个不同的对象,它不会是一个字符串。
(leftValue, rightValue) -> {
RawPMU_Joined j = new RawPMU_Joined();
j.set...
return j;
}