具有三向连接的 Spark 结构化流水印
Watermarking for Spark structured streaming with three way joins
我有 3 个数据流:foo
、bar
和 baz
。
有必要在以下链中使用 LEFT OUTER JOIN
加入这些流:foo -> bar -> baz
。
这是尝试使用内置 rate
流模拟这些流:
val rateStream = session.readStream
.format("rate")
.option("rowsPerSecond", 5)
.option("numPartitions", 1)
.load()
val fooStream = rateStream
.select(col("value").as("fooId"), col("timestamp").as("fooTime"))
val barStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("barId"), col("timestamp").as("barTime"))
val bazStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("bazId"), col("timestamp").as("bazTime"))
这是将所有这些流连接在一起的第一种方法,假设 foo
、bar
和 baz
的潜在延迟很小 (~ 5 seconds
):
val foobarStream = fooStream
.withWatermark("fooTime", "5 seconds")
.join(
barStream.withWatermark("barTime", "5 seconds"),
expr("""
barId = fooId AND
fooTime >= barTime AND
fooTime <= barTime + interval 5 seconds
"""),
joinType = "leftOuter"
)
val foobarbazQuery = foobarStream
.join(
bazStream.withWatermark("bazTime", "5 seconds"),
expr("""
bazId = fooId AND
bazTime >= fooTime AND
bazTime <= fooTime + interval 5 seconds
"""),
joinType = "leftOuter")
.writeStream
.format("console")
.start()
通过上面的设置,我能够观察到以下数据元组:
(some_foo, some_bar, some_baz)
(some_foo, some_bar, null)
但仍然缺少 (some_foo, null, some_baz)
和 (some_foo, null, null)
。
任何想法,如何正确配置水印以获得所有组合?
更新:
在 barTime
上令人惊讶地为 foobarStream
添加了额外的水印后:
val foobarbazQuery = foobarStream
.withWatermark("barTime", "1 minute")
.join(/* ... */)`
我可以得到这个 (some_foo, null, some_baz)
组合,但仍然缺少 (some_foo, null, null)
...
我留下一些信息仅供参考。
链接流-流连接无法正常工作,因为 Spark 仅支持全局水印(而不是操作员智能水印),这可能会导致连接之间的中间输出丢失。
Apache Spark 社区指出了这个问题并在不久前进行了讨论。下面是 link 以了解更多详情:
https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E
(免责声明:我是作者发起的邮件线程。)
我有 3 个数据流:foo
、bar
和 baz
。
有必要在以下链中使用 LEFT OUTER JOIN
加入这些流:foo -> bar -> baz
。
这是尝试使用内置 rate
流模拟这些流:
val rateStream = session.readStream
.format("rate")
.option("rowsPerSecond", 5)
.option("numPartitions", 1)
.load()
val fooStream = rateStream
.select(col("value").as("fooId"), col("timestamp").as("fooTime"))
val barStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("barId"), col("timestamp").as("barTime"))
val bazStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("bazId"), col("timestamp").as("bazTime"))
这是将所有这些流连接在一起的第一种方法,假设 foo
、bar
和 baz
的潜在延迟很小 (~ 5 seconds
):
val foobarStream = fooStream
.withWatermark("fooTime", "5 seconds")
.join(
barStream.withWatermark("barTime", "5 seconds"),
expr("""
barId = fooId AND
fooTime >= barTime AND
fooTime <= barTime + interval 5 seconds
"""),
joinType = "leftOuter"
)
val foobarbazQuery = foobarStream
.join(
bazStream.withWatermark("bazTime", "5 seconds"),
expr("""
bazId = fooId AND
bazTime >= fooTime AND
bazTime <= fooTime + interval 5 seconds
"""),
joinType = "leftOuter")
.writeStream
.format("console")
.start()
通过上面的设置,我能够观察到以下数据元组:
(some_foo, some_bar, some_baz)
(some_foo, some_bar, null)
但仍然缺少 (some_foo, null, some_baz)
和 (some_foo, null, null)
。
任何想法,如何正确配置水印以获得所有组合?
更新:
在 barTime
上令人惊讶地为 foobarStream
添加了额外的水印后:
val foobarbazQuery = foobarStream
.withWatermark("barTime", "1 minute")
.join(/* ... */)`
我可以得到这个 (some_foo, null, some_baz)
组合,但仍然缺少 (some_foo, null, null)
...
我留下一些信息仅供参考。
链接流-流连接无法正常工作,因为 Spark 仅支持全局水印(而不是操作员智能水印),这可能会导致连接之间的中间输出丢失。
Apache Spark 社区指出了这个问题并在不久前进行了讨论。下面是 link 以了解更多详情: https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E
(免责声明:我是作者发起的邮件线程。)