KSQL join WHERE 子句的 java/scala kafka 流等效项是什么?
What is java/scala kafka streams equivalent for KSQL join WHERE clause?
假设我有 2 个 kafka 流(kafka-streams-scala 库,版本 2.2.0):
val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")
他们的加入:
val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) => MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))
KSQL 中 WHERE 子句的等价物是什么? (参见 late_orders 流)流 API?
只使用 stream3.filter 是个好主意吗?这种方式能和KSQL创建流一样高效吗?
What is the equivalent of WHERE clause available in KSQL? (see late_orders stream) for streams API?
是:
KStream#filter()
,其中 returns 已过滤 KStream
KTable#filter()
,其中 returns 已过滤 KTable
Is it a good idea to just use stream3.filter?
是的。
Will this approach have the same efficiency as stream created by KSQL?
是的。
假设我有 2 个 kafka 流(kafka-streams-scala 库,版本 2.2.0):
val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")
他们的加入:
val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) => MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))
KSQL 中 WHERE 子句的等价物是什么? (参见 late_orders 流)流 API? 只使用 stream3.filter 是个好主意吗?这种方式能和KSQL创建流一样高效吗?
What is the equivalent of WHERE clause available in KSQL? (see late_orders stream) for streams API?
是:
KStream#filter()
,其中 returns 已过滤KStream
KTable#filter()
,其中 returns 已过滤KTable
Is it a good idea to just use stream3.filter?
是的。
Will this approach have the same efficiency as stream created by KSQL?
是的。