自定义无限源如何在 Google Cloud DataFlow 中工作?
How custom unbounded sources work in Google Cloud DataFlow?
我正在尝试实现 Google Cloud Dataflow 的自定义无限源以从 Amazon Kinesis 队列中读取。为了正确实施检查点,我想了解该机制的具体工作原理。
DataFlow 的工作原理
我试图通过阅读 DataFlow 的文档来理解检查点,但是缺少一些关键的东西,所以我阅读了一篇 MillWheel 论文。首先让我解释一下我是如何理解这篇论文中提出的概念的。我将根据数据流 API:
在强大的生产设置中关注源与其消费者之间的交互
createReader()
在源上调用,空值作为 CheckpointMark 传递
start()
在 reader 实例上调用
advance()
在 reader 上被调用 X 次
- 现在工人决定做一个检查点标记。它在 reader.
上调用 getCheckpointMark()
- 检查点由 worker
持久化
finalizeCheckpoint()
在检查点对象上调用
- 到目前为止读取的数据被发送给消费者,消费者将记录存储在缓存中以便为可能的重试进行重复数据删除
- 消费者向源发送ACK。此时从源中删除检查点,当接受 ACK 时,消费者从缓存中删除记录(因为此时源不会重试)
- 如果源未能收到 ACK,那么它将创建新的 reader 实例,将最后一个检查点作为参数传递,并将重试向消费者发送数据。如果消费者收到重试的数据,它会尝试去重
- 一切都重复了。我不清楚它到底是如何发生的:第一个 reader 实例是否用于继续从流中读取?或者创建带有空检查点标记的新 reader 来执行此操作?还是第二个reader(带有检查点数据)用于继续从流中读取?
PubSub 与 Kinesis
现在,请允许我说几句关于 Kinesis queue 是如何运行的,因为它与 Pub/Sub 有显着差异(就我了解 Pub/Sub 的工作原理而言,我还没有自己一直在用)。
Pub/Sub
我看到 Pub/Sub 拉模型严重依赖 ACK,即客户端收到的消息被确认,然后 Pub/Sub 中的 "internal checkpoint" 向前移动 -> 这意味着即将到来pull request会在上一个ACK之后收到连续的记录。
运动学
Kinesis 拉界面(这里根本没有推)更类似于您与文件交互的方式。您可以从流中的任何位置开始读取(特殊值 TRIM_HORIZON 是流中最旧的记录,而 LATEST 是流中的最新记录),然后使用迭代器逐个记录向前移动记录(迭代器存储在服务器上如果未使用,有 5 分钟的有效期)。服务器没有 ACK - 客户端负责跟踪流中的位置,您可以随时重新读取旧记录(当然,除非它们已过期)。
问题/问题
- 检查点应该是什么样子? reader,给定检查点,是预期只读取与其相关的部分数据,还是预期从检查点读取所有数据?换句话说,我的检查点应该是:"data between x and y" 还是 "all data after x"?
- 我知道第一个 reader 得到 null 作为检查点标记,这很好 - 这意味着我应该从应用程序开发人员定义的点开始读取。但是 DataFlow 可以像这样用 null 创建其他 readers(例如,我想象 reader jvm 死机时的情况,然后 DataFlow 创建一个新的 reader 将 null 作为检查点传递)?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的 reader 读取了一些数据,现在进度标记丢失了。
- 消费者端的记录重复数据删除使用什么id?是
getCurrentRecordId
返回的值吗?我在问这个问题,因为我考虑过为此使用流中的位置,因为它对于特定流是唯一的。但是,如果我稍后通过展平它们来加入少数运动源会发生什么 - >这将导致不同记录可能共享相同 ID 的情况。我是否应该使用(流名称,位置)元组作为 id(在这种情况下是唯一的)。
干杯,
普热梅克
我们很高兴看到您将 Dataflow 与 Kinesis 结合使用。我们希望对我们的 GitHub project with a contrib connector for Kinesis 提出拉取请求。我们也很乐意在您开发时通过 GitHub 审查您的代码并在那里给您反馈。
how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?
检查点标记应表示“已由reader产生并最终确定的数据”。例如,如果 reader 负责特定的分片,则检查点标记可能由分片标识符和该分片中已成功读取的最后一个序列号 Y 组成,表示“直到并包括 Y 的所有数据已经已制作。
I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.
即使 JVM 发生故障,最终确定的检查点也会保留。换句话说,当一个 JVM 死亡时,reader 将用最后一个已完成的检查点构建。您不应该看到使用空检查点创建的 readers,除非它们旨在从源的开头读取,或者在您的场景中 JVM 在第一次成功调用 finalizeCheckpoint()
之前死亡。可以使用新reader处的checkpoint标记为同一个分片构造一个新的迭代器,从下一条要读取的记录开始,可以继续不丢失数据。
what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).
在 Dataflow 中,每个 UnboundedSource(实现 getCurrentRecordId
并将 requiresDeduping
覆盖为 return true
)都自行删除重复数据。因此,只要求记录 ID 对于同一源实例是唯一的。来自两个不同来源的记录可以使用相同的记录 ID,并且在展平期间它们不会被视为“重复”。因此,如果 Amazon Kinesis 保证所有记录都具有全局唯一(跨流中的所有分片)和持久性(例如,跨重新分片操作)的 ID,那么这些应该适合用作记录 ID。
请注意,getCurrentRecordId
是 可选的 方法 UnboundedReader
-- 如果您的检查点方案唯一标识每条记录,则无需实施它。 Kinesis 允许您按序号顺序读取记录,看起来序号是全局唯一的。因此,您可以将每个分片分配给 generateInitialSplits
中的不同工作人员,并且每个工作人员可能永远不会产生重复数据——在这种情况下,您可能根本不需要担心记录 ID。
此答案的大部分假设了您的 Kinesis 流永远不会更改其分片的简单情况。另一方面,如果流上的分片发生变化,那么您的解决方案将变得更加复杂。例如,每个 worker 可能负责 1 个以上的分片,因此检查点标记将是一个分片映射 -> 序列号而不是序列号。拆分和合并的分片可能会在不同的 Dataflow worker 之间移动以平衡负载,并且可能很难保证 Kinesis 记录不会被两个不同的 worker 读取两次。在这种情况下,使用具有您描述的语义的 Kinesis 记录 ID 就足够了。
我正在尝试实现 Google Cloud Dataflow 的自定义无限源以从 Amazon Kinesis 队列中读取。为了正确实施检查点,我想了解该机制的具体工作原理。
DataFlow 的工作原理
我试图通过阅读 DataFlow 的文档来理解检查点,但是缺少一些关键的东西,所以我阅读了一篇 MillWheel 论文。首先让我解释一下我是如何理解这篇论文中提出的概念的。我将根据数据流 API:
在强大的生产设置中关注源与其消费者之间的交互createReader()
在源上调用,空值作为 CheckpointMark 传递
start()
在 reader 实例上调用advance()
在 reader 上被调用 X 次
- 现在工人决定做一个检查点标记。它在 reader. 上调用
- 检查点由 worker 持久化
finalizeCheckpoint()
在检查点对象上调用- 到目前为止读取的数据被发送给消费者,消费者将记录存储在缓存中以便为可能的重试进行重复数据删除
- 消费者向源发送ACK。此时从源中删除检查点,当接受 ACK 时,消费者从缓存中删除记录(因为此时源不会重试)
- 如果源未能收到 ACK,那么它将创建新的 reader 实例,将最后一个检查点作为参数传递,并将重试向消费者发送数据。如果消费者收到重试的数据,它会尝试去重
- 一切都重复了。我不清楚它到底是如何发生的:第一个 reader 实例是否用于继续从流中读取?或者创建带有空检查点标记的新 reader 来执行此操作?还是第二个reader(带有检查点数据)用于继续从流中读取?
getCheckpointMark()
PubSub 与 Kinesis
现在,请允许我说几句关于 Kinesis queue 是如何运行的,因为它与 Pub/Sub 有显着差异(就我了解 Pub/Sub 的工作原理而言,我还没有自己一直在用)。
Pub/Sub
我看到 Pub/Sub 拉模型严重依赖 ACK,即客户端收到的消息被确认,然后 Pub/Sub 中的 "internal checkpoint" 向前移动 -> 这意味着即将到来pull request会在上一个ACK之后收到连续的记录。
运动学
Kinesis 拉界面(这里根本没有推)更类似于您与文件交互的方式。您可以从流中的任何位置开始读取(特殊值 TRIM_HORIZON 是流中最旧的记录,而 LATEST 是流中的最新记录),然后使用迭代器逐个记录向前移动记录(迭代器存储在服务器上如果未使用,有 5 分钟的有效期)。服务器没有 ACK - 客户端负责跟踪流中的位置,您可以随时重新读取旧记录(当然,除非它们已过期)。
问题/问题
- 检查点应该是什么样子? reader,给定检查点,是预期只读取与其相关的部分数据,还是预期从检查点读取所有数据?换句话说,我的检查点应该是:"data between x and y" 还是 "all data after x"?
- 我知道第一个 reader 得到 null 作为检查点标记,这很好 - 这意味着我应该从应用程序开发人员定义的点开始读取。但是 DataFlow 可以像这样用 null 创建其他 readers(例如,我想象 reader jvm 死机时的情况,然后 DataFlow 创建一个新的 reader 将 null 作为检查点传递)?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的 reader 读取了一些数据,现在进度标记丢失了。
- 消费者端的记录重复数据删除使用什么id?是
getCurrentRecordId
返回的值吗?我在问这个问题,因为我考虑过为此使用流中的位置,因为它对于特定流是唯一的。但是,如果我稍后通过展平它们来加入少数运动源会发生什么 - >这将导致不同记录可能共享相同 ID 的情况。我是否应该使用(流名称,位置)元组作为 id(在这种情况下是唯一的)。
干杯, 普热梅克
我们很高兴看到您将 Dataflow 与 Kinesis 结合使用。我们希望对我们的 GitHub project with a contrib connector for Kinesis 提出拉取请求。我们也很乐意在您开发时通过 GitHub 审查您的代码并在那里给您反馈。
how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?
检查点标记应表示“已由reader产生并最终确定的数据”。例如,如果 reader 负责特定的分片,则检查点标记可能由分片标识符和该分片中已成功读取的最后一个序列号 Y 组成,表示“直到并包括 Y 的所有数据已经已制作。
I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.
即使 JVM 发生故障,最终确定的检查点也会保留。换句话说,当一个 JVM 死亡时,reader 将用最后一个已完成的检查点构建。您不应该看到使用空检查点创建的 readers,除非它们旨在从源的开头读取,或者在您的场景中 JVM 在第一次成功调用 finalizeCheckpoint()
之前死亡。可以使用新reader处的checkpoint标记为同一个分片构造一个新的迭代器,从下一条要读取的记录开始,可以继续不丢失数据。
what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).
在 Dataflow 中,每个 UnboundedSource(实现 getCurrentRecordId
并将 requiresDeduping
覆盖为 return true
)都自行删除重复数据。因此,只要求记录 ID 对于同一源实例是唯一的。来自两个不同来源的记录可以使用相同的记录 ID,并且在展平期间它们不会被视为“重复”。因此,如果 Amazon Kinesis 保证所有记录都具有全局唯一(跨流中的所有分片)和持久性(例如,跨重新分片操作)的 ID,那么这些应该适合用作记录 ID。
请注意,getCurrentRecordId
是 可选的 方法 UnboundedReader
-- 如果您的检查点方案唯一标识每条记录,则无需实施它。 Kinesis 允许您按序号顺序读取记录,看起来序号是全局唯一的。因此,您可以将每个分片分配给 generateInitialSplits
中的不同工作人员,并且每个工作人员可能永远不会产生重复数据——在这种情况下,您可能根本不需要担心记录 ID。
此答案的大部分假设了您的 Kinesis 流永远不会更改其分片的简单情况。另一方面,如果流上的分片发生变化,那么您的解决方案将变得更加复杂。例如,每个 worker 可能负责 1 个以上的分片,因此检查点标记将是一个分片映射 -> 序列号而不是序列号。拆分和合并的分片可能会在不同的 Dataflow worker 之间移动以平衡负载,并且可能很难保证 Kinesis 记录不会被两个不同的 worker 读取两次。在这种情况下,使用具有您描述的语义的 Kinesis 记录 ID 就足够了。