KStream-KTable连接,消息排序
KStream-KTable join, message ordering
假设我在 topicA 上有以下两条使用相同密钥的消息
- message1: (k, A1)(在时间 t=0 收到)
- message2: (k, A2)(在时间 t=5 收到)
和拓扑结构
KStream<Integer, A> streamA = builder.stream(topicA);
KTable<Integer, B> tableB = builder.table(topicB);
streamA.leftJoin(tableB, (a, b) -> myJoiner(a,b)).to(topicAB);
假设 myJoiner(a1, b)
需要 100 个时间单位,myJoiner(a2, b)
需要 10 个时间单位。消息将以什么顺序出现在 topicAB 上?
我想弄清楚的是 stream.leftJoin(table, (k,v)->f(k,v)) 是否是阻塞操作 当流时,table 和 k 是一样的。? (在我的示例中,这意味着 myJoiner(a1, b) 将在时间 t=100 左右首先出现,而 myJoiner(a2,b) 将在时间 t=110 左右出现第二个)。或者它是异步的(这意味着 myJoiner(a2,b) 将在时间 t=15 左右首先出现,而 myJoiner(a1,b) 在时间 t=100 左右出现第二个)
谢谢,
大卫
连接正在执行"sync"并且保证顺序。
In my example this would mean myJoiner(a1, b) would appear first around time t=100 and myJoiner(a2,b) would appear second around time t=110
是的,这正是它的执行方式。
假设我在 topicA 上有以下两条使用相同密钥的消息
- message1: (k, A1)(在时间 t=0 收到)
- message2: (k, A2)(在时间 t=5 收到)
和拓扑结构
KStream<Integer, A> streamA = builder.stream(topicA);
KTable<Integer, B> tableB = builder.table(topicB);
streamA.leftJoin(tableB, (a, b) -> myJoiner(a,b)).to(topicAB);
假设 myJoiner(a1, b)
需要 100 个时间单位,myJoiner(a2, b)
需要 10 个时间单位。消息将以什么顺序出现在 topicAB 上?
我想弄清楚的是 stream.leftJoin(table, (k,v)->f(k,v)) 是否是阻塞操作 当流时,table 和 k 是一样的。? (在我的示例中,这意味着 myJoiner(a1, b) 将在时间 t=100 左右首先出现,而 myJoiner(a2,b) 将在时间 t=110 左右出现第二个)。或者它是异步的(这意味着 myJoiner(a2,b) 将在时间 t=15 左右首先出现,而 myJoiner(a1,b) 在时间 t=100 左右出现第二个)
谢谢, 大卫
连接正在执行"sync"并且保证顺序。
In my example this would mean myJoiner(a1, b) would appear first around time t=100 and myJoiner(a2,b) would appear second around time t=110
是的,这正是它的执行方式。