Kafka Stream-GlobalKTable 在特定字段上加入
Kafka Stream-GlobalKTable join on a specific field
所以我有一个 KStream,它像这样被反序列化为 POJO
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
这是全局 Ktable 记录的样子
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
我希望能够将 KStream 的 stock_symbol
字段与 Ktable 的 tckr
字段连接起来。这可能吗?我想在将其流式传输到另一个主题之前创建一个新的 EnrichedMessage
对象。我有如下代码,但我似乎遇到了一些空指针异常。
Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
代码片段如下所示。
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
我想我的 leftJoin 逻辑可能有一些错误。
进行左连接时,可以假定左流的记录不为空;但是,您不能假设正确的 GlobalKTable 将具有匹配给定键的记录,因此结果记录可能为空。在您的情况下,当您实例化 new EnrichedMessage(financialMessageValue, companySectorsValue)
时,您确定 companySectorsValue
不为空吗?如果为空,您是否正确处理了它?您的 NPE 似乎出现在 EnrichedMessage
的构造函数中,因此只需确保您知道 companySectorsValue
可以为 null。
此外,请确保您的 GlobalKTable 在发生任何连接逻辑之前已预先填充。
所以我有一个 KStream,它像这样被反序列化为 POJO
public class FinancialMessage {
public String user_id;
public String stock_symbol;
public String exchange_id;
}
这是全局 Ktable 记录的样子
public class CompanySectors {
public String company_id;
public String company_name;
public String tckr;
public String sector_cd;
}
我希望能够将 KStream 的 stock_symbol
字段与 Ktable 的 tckr
字段连接起来。这可能吗?我想在将其流式传输到另一个主题之前创建一个新的 EnrichedMessage
对象。我有如下代码,但我似乎遇到了一些空指针异常。
Exception in thread "trade-enrichment-stream-0c7e7782-4217-4450-8086-21871b4ebc45-StreamThread-1" java.lang.NullPointerException
at com.domain.EnrichedMessage.<init>(EnrichedMessage.java:51)
at com.domain.TradeEnrichmentTopology.lambda(TradeEnrichmentTopology.java:73)
at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:101)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:801)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
代码片段如下所示。
KStream<String, FinancialMessage> financialMessageStream =
builder.stream(
INCOMING_TOPIC,
Consumed.with(Serdes.String(), financialMessageSerde)
);
GlobalKTable<String, CompanySectors> companySectorsStore =
builder.globalTable(
KTABLE_TOPIC,
Consumed.with(Serdes.String(), companySectorsSerde)
);
KStream<String, EnrichedMessage> enrichedStream = financialMessageStream.leftJoin(
companySectorsStore,
(financialMessageKey, financialMessageValue) -> financialMessageValue.stock_symbol,
(financialMessageValue, companySectorsValue) -> new EnrichedMessage(financialMessageValue, companySectorsValue)
);
enrichedStream.to(
OUTGOING_TOPIC,
Produced.with(Serdes.String(), enrichedMessageSerde));
我想我的 leftJoin 逻辑可能有一些错误。
进行左连接时,可以假定左流的记录不为空;但是,您不能假设正确的 GlobalKTable 将具有匹配给定键的记录,因此结果记录可能为空。在您的情况下,当您实例化 new EnrichedMessage(financialMessageValue, companySectorsValue)
时,您确定 companySectorsValue
不为空吗?如果为空,您是否正确处理了它?您的 NPE 似乎出现在 EnrichedMessage
的构造函数中,因此只需确保您知道 companySectorsValue
可以为 null。
此外,请确保您的 GlobalKTable 在发生任何连接逻辑之前已预先填充。