如何连接两个 Kafka 流并在具有 Avro 值的主题中生成结果
How to join two Kafka streams and produce the result in a topic with Avro values
我有两个 Kafka 流,它们的键为 String
,值为 Avro
格式,这是我使用 KSQL 创建的。
这是第一个:
DESCRIBE EXTENDED STREAM_1;
Type : STREAM
Key field : IDUSER
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : STREAM_1 (partitions: 4, replication: 1)
Field | Type
--------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
IDUSER | VARCHAR(STRING)
第二个:
DESCRIBE EXTENDED STREAM_2;
Type : STREAM
Key field : IDUSER
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : STREAM_2 (partitions: 4, replication: 1)
Field | Type
--------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERNAME | VARCHAR(STRING)
IDUSER | VARCHAR(STRING)
DEVICE | VARCHAR(STRING)
所需的输出应包括 IDUSER
、LASTNAME
、DEVICE
和 USERNAME
。
我想 left join
使用 Streams API 这些流(在 IDUSER
上)并将输出写入 kafka 主题。
为此,我尝试了以下方法:
public static void main(String[] args) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-strteams");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Serde<String> stringSerde = Serdes.String();
final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
boolean isKeySerde = false;
genericAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), isKeySerde);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, GenericRecord> left = builder.stream("STREAM_1");
KStream<String, GenericRecord> right = builder.stram("STREAM_2");
// Java 8+ example, using lambda expressions
KStream<String, GenericRecord> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
stringSerde, /* key */
genericAvroSerde, /* left value */
genericAvroSerde) /* right value */
);
joined.to(stringSerde, genericAvroSerde, "streams-output-testing");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
然而,
KStream<String, GenericRecord> joined = ...
在我的 IDE 上抛出错误:
incompatible types: inference variable VR has incompatible bounds
当我尝试对键和值都使用 String
Serde
时,它可以工作,但数据无法从 kafka-console-consumer
读取。我想要做的是生成 AVRO 格式的数据,以便能够使用 kafka-avro-console-consumer
读取它们。
我的第一个猜测是您从连接操作返回 String
,而您的代码期望结果为 GenericRecord
:
KStream<String, GenericRecord> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, ...)
请注意 joined
如何具有类型 KStream<String, GenericRecord>
,即值具有类型 GenericRecord
,但连接输出是通过 "left=" + leftValue + ", right=" + rightValue
计算的,其类型为 [=11] =].
您可以直接 return 值而不是将值转换为字符串。
例如:
KStream joined = left.leftJoin(right,
(leftValue, rightValue) -> { return rightValue});
我有两个 Kafka 流,它们的键为 String
,值为 Avro
格式,这是我使用 KSQL 创建的。
这是第一个:
DESCRIBE EXTENDED STREAM_1;
Type : STREAM
Key field : IDUSER
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : STREAM_1 (partitions: 4, replication: 1)
Field | Type
--------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
IDUSER | VARCHAR(STRING)
第二个:
DESCRIBE EXTENDED STREAM_2;
Type : STREAM
Key field : IDUSER
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : STREAM_2 (partitions: 4, replication: 1)
Field | Type
--------------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
USERNAME | VARCHAR(STRING)
IDUSER | VARCHAR(STRING)
DEVICE | VARCHAR(STRING)
所需的输出应包括 IDUSER
、LASTNAME
、DEVICE
和 USERNAME
。
我想 left join
使用 Streams API 这些流(在 IDUSER
上)并将输出写入 kafka 主题。
为此,我尝试了以下方法:
public static void main(String[] args) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-strteams");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Serde<String> stringSerde = Serdes.String();
final Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
boolean isKeySerde = false;
genericAvroSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), isKeySerde);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, GenericRecord> left = builder.stream("STREAM_1");
KStream<String, GenericRecord> right = builder.stram("STREAM_2");
// Java 8+ example, using lambda expressions
KStream<String, GenericRecord> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
stringSerde, /* key */
genericAvroSerde, /* left value */
genericAvroSerde) /* right value */
);
joined.to(stringSerde, genericAvroSerde, "streams-output-testing");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
然而,
KStream<String, GenericRecord> joined = ...
在我的 IDE 上抛出错误:
incompatible types: inference variable VR has incompatible bounds
当我尝试对键和值都使用 String
Serde
时,它可以工作,但数据无法从 kafka-console-consumer
读取。我想要做的是生成 AVRO 格式的数据,以便能够使用 kafka-avro-console-consumer
读取它们。
我的第一个猜测是您从连接操作返回 String
,而您的代码期望结果为 GenericRecord
:
KStream<String, GenericRecord> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, ...)
请注意 joined
如何具有类型 KStream<String, GenericRecord>
,即值具有类型 GenericRecord
,但连接输出是通过 "left=" + leftValue + ", right=" + rightValue
计算的,其类型为 [=11] =].
您可以直接 return 值而不是将值转换为字符串。 例如:
KStream joined = left.leftJoin(right,
(leftValue, rightValue) -> { return rightValue});