Kafka 流连接两个特定的 Avro 对象

Kafka streams joins two specific Avro objects

基本任务

我在 Avro 格式的 Kafka 中有 2 个相同的流。我正在尝试对这两个流进行基本的左连接。

按键

对于这两个主题中的键,我使用四舍五入到毫秒的时间戳,因为两个流都有来自 IoT 设备的数据,该设备每 20 毫秒生成一次测量,并且两个设备现在都同步到 UTC 时间。

到目前为止完成

我已经能够生成 Kafka 流,该流将一个流转换为主题 this tutorial, but unfortunately basic stream-stream join Confluent 开发人员页面上不存在教程。

Avro Java 序列化 classes 我根据 2 个输入和输出生成了 3 SpecificAvroSerde classes。 尽管输入流是相同的,但我已经创建了单独的 schemas/classes 以防将来流有不同的模式。 Avro Java classes 是在构建时产生的 whiteout 问题。

这是输入、输出和加入流的模式:

 {
    "namespace": "pmu.serialization.avro",
    "name": "RawPMU_214",
    "type": "record",
    "fields": [
     {"name": "pmu_id", "type": "int"},
            {"name": "time", "type":"string"},
            {"name": "time_rounded", "type":"string"},
            {"name": "stream_id","type":"int"},
            {"name": "stat", "type":"string"},
            {"name": "ph_i1_r","type":"float"},
            {"name": "ph_i1_j","type":"float"},
            {"name": "ph_i2_r","type":"float"},
            {"name": "ph_i2_j","type":"float"},
            {"name": "ph_i3_r","type":"float"},
            {"name": "ph_i3_j","type":"float"},
            {"name": "ph_v4_r","type":"float"},
            {"name": "ph_v4_j","type":"float"},
            {"name": "ph_v5_r","type":"float"},
            {"name": "ph_v5_j","type":"float"},
            {"name": "ph_v6_r","type":"float"},
            {"name": "ph_v6_j","type":"float"},
            {"name": "ph_7_r","type":"float"},
            {"name": "ph_7_j","type":"float"},
            {"name": "ph_8_r","type":"float"},
            {"name": "ph_8_j","type":"float"},
            {"name": "analog","type":"string"},
            {"name": "digital","type":"string"},
            {"name": "frequency","type":"float"},
            {"name": "rocof","type":"int"},
            {"name": "orderCount","type":"int"}
    ]
 }

代码

关键问题是我不知道如何使用值连接器正确实现这部分:

    KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */

我尝试了各种答案,但我还没有真正找到 SpecificAvroSerde 的流-流连接的完整 Java 代码示例。

此时完整代码:

package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;

import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class TransformStream_join {

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        // Define input PMU topics
        final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
        final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
        final String outputTopic = allProps.getProperty("output.topic.name");

        KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
        KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);

        KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
                JoinWindows.of(Duration.ofMillis(20)),
                Joined.with(
                        Serdes.String(),
                        raw_pmu214AvroSerde(allProps),
                        raw_pmu218AvroSerde(allProps))
                );

    joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
          return builder.build();
    }

    private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
        raw_pmu214AvroSerde.configure((Map)allProps, false);
        return raw_pmu214AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
        raw_pmu218AvroSerde.configure((Map)allProps, false);
        return raw_pmu218AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
        raw_outAvroSerde.configure((Map)allProps, false);
        return raw_outAvroSerde;
    }

    public void createTopics(Properties allProps) {
        AdminClient client = AdminClient.create(allProps);

        List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu1"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu2"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("output.topic.name"),
                Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

        client.createTopics(topics);
        client.close();
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        TransformStream ts = new TransformStream();
        Properties allProps = ts.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        Topology topology = ts.buildTopology(allProps);

        ts.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

编辑

KStream 加入: 自从创建 joiner class

以来,我简化了加入流代码
KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
        JoinWindows.of(Duration.ofMillis(20)),
        Joined.with(
                Serdes.String(),
                raw_pmu214AvroSerde(allProps),
                raw_pmu218AvroSerde(allProps))
);

PMUJoiner class

package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214; 
import pmu.serialization.avro.RawPMU_218; 
import pmu.serialization.avro.RawPMU_Joined; 
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {

    public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
        return RawPMU_Joined.newBuilder()
                // PMU 218
                .setTimeRounded1(pmu218Stream.getTimeRounded())
                .setOrderCount1(pmu218Stream.getOrderCount())
                .setPhI1R1(pmu218Stream.getPhI1R())
                .setPhI1J1(pmu218Stream.getPhI1J())
                .setPhI2R1(pmu218Stream.getPhI2R())
                .setPhI2J1(pmu218Stream.getPhI2J())
                .setPhI3R1(pmu218Stream.getPhI3R())
                .setPhI3J1(pmu218Stream.getPhI3J())
                .setPhV4R1(pmu218Stream.getPhV4R())
                .setPhV4J1(pmu218Stream.getPhV4J())
                .setPhV5R1(pmu218Stream.getPhV5R())
                .setPhV5J1(pmu218Stream.getPhV5J())
                .setPhV6R1(pmu218Stream.getPhV6R())
                .setPhV6J1(pmu218Stream.getPhV6J())
                .setPh7R1(pmu218Stream.getPh7R())
                .setPh7J1(pmu218Stream.getPh7J())
                .setPh8R1(pmu218Stream.getPh8R())
                .setPh8J1(pmu218Stream.getPh8J())
                //PMU 214
                .setTimeRounded2(pmu214Stream.getTimeRounded())
                .setOrderCount2(pmu214Stream.getOrderCount())
                .setPhI1R2(pmu214Stream.getPhI1R())
                .setPhI1J2(pmu214Stream.getPhI1J())
                .setPhI2R2(pmu214Stream.getPhI2R())
                .setPhI2J2(pmu214Stream.getPhI2J())
                .setPhI3R2(pmu214Stream.getPhI3R())
                .setPhI3J2(pmu214Stream.getPhI3J())
                .setPhV4R2(pmu214Stream.getPhV4R())
                .setPhV4J2(pmu214Stream.getPhV4J())
                .setPhV5R2(pmu214Stream.getPhV5R())
                .setPhV5J2(pmu214Stream.getPhV5J())
                .setPhV6R2(pmu214Stream.getPhV6R())
                .setPhV6J2(pmu214Stream.getPhV6J())
                .setPh7R2(pmu214Stream.getPh7R())
                .setPh7J2(pmu214Stream.getPh7J())
                .setPh8R2(pmu214Stream.getPh8R())
                .setPh8J2(pmu214Stream.getPh8J())
                .build();
    }
}

错误

...pmuStream01/src/main/java/io/confluent/developer/JoinPMUStreams.java:46: error: no suitable method found for join(org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>,io.confluent.developer.PMUJoiner,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,pmu.serialization.avro.RawPMU_218>) KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, ^ method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows) is not applicable (cannot infer type-variable(s) VO,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.StreamJoined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>) is not applicable (cannot infer type-variable(s) VO,VR (argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VT>) is not applicable (cannot infer type-variable(s) VT,VR (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>) is not applicable (cannot infer type-variable(s) GK,GV,RV (actual and formal argument lists differ in length)) method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<? super java.lang.String,? super pmu.serialization.avro.RawPMU_214,? extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>,org.apache.kafka.streams.kstream.Named) is not applicable (cannot infer type-variable(s) GK,GV,RV (argument mismatch; org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218> cannot be converted to org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>))

不知道为什么会这样,因为我相信我已经正确地为所有参数提供了正确的 return 类型。

我建议从这个开始。您可以遵循通用的 Avro 示例。特定记录不需要一个,因为它只是您要返回的一个不同的对象,它不会是一个字符串。

(leftValue, rightValue) -> {
    RawPMU_Joined j = new RawPMU_Joined();
    j.set...
    return j;
}