KafkaStreams Left Join DSL:插入外部空值
KafkaStreams Left Join DSL: inserting on outer null value
我有一个混合匹配的 DSL-PAPI 拓扑。 DSL 部分将综合浏览量("pageviews" 主题)与这些综合浏览量的用户("users" 主题)结合起来。我想加入两者,所以如果用户是新用户,则从 pvs 信息创建一个新的 "user" 到 "users" 主题,否则什么都不做。
所以我正在尝试在页面浏览量和用户之间进行左连接,如果用户为空,则意味着尚未使用此键创建用户,因此在这种情况下我会创建一个。
在代码中,我将页面浏览量作为流,将用户作为 table,当用户在连接中为 null 时加入它们生成新用户,然后过滤并发送给 "users" 这些新用户。
val builder = new StreamsBuilder()
val pageviewsTopic: KStream[Key, Pageview] = builder.stream("pageviews")
.map((muipk, pageview) => (new MerchantUserPartitionKey(muipk.merchantSiteId, muipk.uid) -> pageview))
val usersTopic: KTable[MerchantUserPartitionKey, user] = builder.table("users")
val joinedPageviewsWithUsers: KStream[MerchantUserPartitionKey, User] =
pageviewsTopic.leftJoin(
usersTopic,
new ValueJoiner[Pageview, User, User] {
override def apply(pageview: Pageview, user: User): User = {
logger.info("JOIN PAGEVIEW-user")
if (user == null) {
new User(UUIDUtils.generateRandomId(), pageview.uid /*, some other data */)
} else {
logger.info("user already created.")
null
}
}
})
// Generate users.
joinedPageviewsWithUsers.
filter((key, user) => user != null ).
to("users")
生成的 DSL 拓扑如下所示:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-MAP-0000000001
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000001-repartition)
<-- KSTREAM-FILTER-0000000006
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000001-repartition])
--> KSTREAM-LEFTJOIN-0000000008
Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [users-STATE-STORE-0000000002])
--> KSTREAM-FILTER-0000000009
<-- KSTREAM-SOURCE-0000000007
Processor: KSTREAM-FILTER-0000000009 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-LEFTJOIN-0000000008
Source: KSTREAM-SOURCE-0000000003 (topics: [users])
--> KTABLE-SOURCE-0000000004
Sink: KSTREAM-SINK-0000000010 (topic: users)
<-- KSTREAM-FILTER-0000000009
Processor: KTABLE-SOURCE-0000000004 (stores: [user-STATE-STORE-0000000002])
--> none
<-- KSTREAM-SOURCE-0000000003
然而,当 运行 这对于具有相同键的多个页面浏览时,"users" 创建新用户,但它总是与 "null" 连接。因此,看起来商店没有使用 "users" 主题中新生成的数据更新,即使它显示使用 user-STATE-STORE-0000000002
。
您需要做一些额外的事情才能将数据导入商店吗?这在某种程度上是 KafkaStreams 反模式吗(写入您之前加入的主题)?
更新 更多信息:
- 键不为空
- 执行 ValueJoiner 代码(显示打印输出),只是用户值始终为空。
- 用户被写入 "users" 主题(在这种情况下,根据逻辑,它每次进入 ValueJoiner 时都会这样做,因为它总是发现外部值为 null,因此它将用户插入 "users")
当一个流在一个子拓扑中查找另一个子拓扑中的 table 时,可能会涉及常规 consumption/production 延迟。例如,当您直接从主题定义流或 table 时,就会发生这种情况。如果您可以使用更有意义的指令,例如 through
(写入主题但让拓扑知道它仍将在此拓扑中使用),它将有助于 KafkaStreams
了解这种关系。
我有一个混合匹配的 DSL-PAPI 拓扑。 DSL 部分将综合浏览量("pageviews" 主题)与这些综合浏览量的用户("users" 主题)结合起来。我想加入两者,所以如果用户是新用户,则从 pvs 信息创建一个新的 "user" 到 "users" 主题,否则什么都不做。
所以我正在尝试在页面浏览量和用户之间进行左连接,如果用户为空,则意味着尚未使用此键创建用户,因此在这种情况下我会创建一个。
在代码中,我将页面浏览量作为流,将用户作为 table,当用户在连接中为 null 时加入它们生成新用户,然后过滤并发送给 "users" 这些新用户。
val builder = new StreamsBuilder()
val pageviewsTopic: KStream[Key, Pageview] = builder.stream("pageviews")
.map((muipk, pageview) => (new MerchantUserPartitionKey(muipk.merchantSiteId, muipk.uid) -> pageview))
val usersTopic: KTable[MerchantUserPartitionKey, user] = builder.table("users")
val joinedPageviewsWithUsers: KStream[MerchantUserPartitionKey, User] =
pageviewsTopic.leftJoin(
usersTopic,
new ValueJoiner[Pageview, User, User] {
override def apply(pageview: Pageview, user: User): User = {
logger.info("JOIN PAGEVIEW-user")
if (user == null) {
new User(UUIDUtils.generateRandomId(), pageview.uid /*, some other data */)
} else {
logger.info("user already created.")
null
}
}
})
// Generate users.
joinedPageviewsWithUsers.
filter((key, user) => user != null ).
to("users")
生成的 DSL 拓扑如下所示:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-MAP-0000000001
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000001-repartition)
<-- KSTREAM-FILTER-0000000006
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000001-repartition])
--> KSTREAM-LEFTJOIN-0000000008
Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [users-STATE-STORE-0000000002])
--> KSTREAM-FILTER-0000000009
<-- KSTREAM-SOURCE-0000000007
Processor: KSTREAM-FILTER-0000000009 (stores: [])
--> KSTREAM-SINK-0000000010
<-- KSTREAM-LEFTJOIN-0000000008
Source: KSTREAM-SOURCE-0000000003 (topics: [users])
--> KTABLE-SOURCE-0000000004
Sink: KSTREAM-SINK-0000000010 (topic: users)
<-- KSTREAM-FILTER-0000000009
Processor: KTABLE-SOURCE-0000000004 (stores: [user-STATE-STORE-0000000002])
--> none
<-- KSTREAM-SOURCE-0000000003
然而,当 运行 这对于具有相同键的多个页面浏览时,"users" 创建新用户,但它总是与 "null" 连接。因此,看起来商店没有使用 "users" 主题中新生成的数据更新,即使它显示使用 user-STATE-STORE-0000000002
。
您需要做一些额外的事情才能将数据导入商店吗?这在某种程度上是 KafkaStreams 反模式吗(写入您之前加入的主题)?
更新 更多信息:
- 键不为空
- 执行 ValueJoiner 代码(显示打印输出),只是用户值始终为空。
- 用户被写入 "users" 主题(在这种情况下,根据逻辑,它每次进入 ValueJoiner 时都会这样做,因为它总是发现外部值为 null,因此它将用户插入 "users")
当一个流在一个子拓扑中查找另一个子拓扑中的 table 时,可能会涉及常规 consumption/production 延迟。例如,当您直接从主题定义流或 table 时,就会发生这种情况。如果您可以使用更有意义的指令,例如 through
(写入主题但让拓扑知道它仍将在此拓扑中使用),它将有助于 KafkaStreams
了解这种关系。