透明的流式处理和批处理
Transparent Streaming & Batch processing
我对流和批处理的世界还很陌生,还试图理解概念和演讲。诚然,我的问题的答案很有可能众所周知,很容易找到,甚至在 SO 上回答了一百次,但我找不到。
背景:
我正在从事一个大型科学项目(核聚变研究),我们在实验运行期间产生大量测量数据。这些数据主要是标有纳秒时间戳的样本流,其中样本可以是任何东西,从单个 ADC 值,通过这样的数组,通过深度结构化数据(从 1 位布尔值到 64 位双精度的多达数百个条目)浮动)到原始高清视频帧甚至字符串文本消息。如果我正确理解常用术语,我会将我们的数据视为 "tabular data",在大多数情况下。
我们主要使用自制的软件解决方案,从简单的在线(流式)分析(如缩放、子采样等)到我们自己的数据存储、管理和访问设施的数据采集。
鉴于运营规模和维护所有这些实施的工作量,我们正在研究使用标准框架和工具来完成更多任务的可能性。
我的问题:
特别是在这个阶段,我们面临着对 live/online/realtime 数据以及 "after the fact" offline/batch 数据分析越来越复杂(自动和手动)的需求"historic" 数据。在这方面,我试图了解现有的分析框架,如 Spark、Flink、Storm 等(可能由消息队列如 Kafka、Pulsar 等支持)是否以及如何支持一个场景,其中
- 数据是 flowing/streamed 到 platform/framework 中,附加了一个标识符,如 URL 或 ID 等
- 平台与集成或外部存储交互以持久保存与标识符相关联的流数据(多年)
- 分析过程现在可以透明地 query/analyse 由标识符和任意(打开或关闭)时间 window 寻址的数据,并且框架为分析提供数据 batches/samples 来自后端存储或来自数据采集的实时
简单地将在线数据流式传输到存储中并从那里查询似乎没有选择,因为我们需要原始数据和分析数据来进行实时监控和实验的实时反馈控制。
此外,让用户以不同的方式查询实时输入信号或历史批次也不是理想的选择,因为我们的物理学家大多不是数据科学家,我们希望 "technicalities" 远离他们,理想情况下是确切的应该使用相同的算法来分析新的实时数据和以前实验中的旧存储数据。
站点注释:
- 我们谈论的是每秒 10 千兆比特范围内的偷看数据负载,这些数据负载会以秒为单位增加到几分钟——这可以由候选人处理吗?
- 我们正在使用纳秒分辨率的时间戳,甚至考虑 pico - 如果我理解正确,这会对可能的候选列表造成一些限制?
如果有人能够理解我的问题并为我阐明这个主题,我将非常感激:-)
非常感谢和亲切的问候,
贝波
我认为没有人可以说 "yes, framework X can definitely handle your workload",因为这在很大程度上取决于您对消息处理的需求,例如关于消息传递的可靠性,以及如何对数据流进行分区。
您可能对 BenchmarkingDistributedStreamProcessingEngines 感兴趣。该论文使用的是几年前的 Storm/Flink/Spark 版本(看起来它们是在 2016 年发布的),但也许作者愿意让您使用他们的基准来评估这三个框架的更新版本?
流式分析的一个非常常见的设置是转到数据源 -> Kafka/Pulsar -> 分析框架 -> 长期数据存储。这将处理与数据摄取分离开来,让您可以像处理新数据一样重新处理历史数据。
我觉得你的第一步应该是看看你能不能通过Kafka/Pulsar得到你需要的数据量。要么手动生成一个测试集,要么从你的生产环境中抓取一些你认为可以代表的数据,看看你是否可以在你需要的 throughput/latency 上通过 Kafka/Pulsar。
记得考虑对数据进行分区。如果您的某些数据流可以独立处理(即顺序无关紧要),则不应将它们放在相同的分区中。例如,可能没有理由混合传感器测量和视频馈送流。如果您可以将数据分成独立的流,那么您就不太可能 运行 陷入 Kafka/Pulsar 和分析框架的瓶颈。单独的数据流还可以让您更好地并行化分析框架中的处理,您可以 运行 例如不同机器上的视频馈送和传感器处理。
一旦您知道是否可以通过 Kafka/Pulsar 获得足够的吞吐量,您应该为 3 个框架中的每一个编写一个小示例。首先,我会从 Kafka/Pulsar 接收和丢弃数据,这应该让您及早知道 Kafka/Pulsar -> 分析路径中是否存在瓶颈。之后,您可以扩展示例以使用示例数据做一些有趣的事情,例如做一些你可能想在生产中做的处理。
您还需要考虑数据流需要哪些类型的处理保证。通常,您会因为保证至少一次或恰好一次处理而付出性能损失。对于某些类型的数据(例如视频源),偶尔丢失消息可能没问题。一旦您决定了所需的保证,您就可以适当地配置分析框架(例如禁用 Storm 中的确认),并尝试对您的测试数据进行基准测试。
只是为了更明确地回答您的一些问题:
实时数据 analysis/monitoring 用例听起来非常适合 Storm/Flink 系统。将它直接连接到 Kafka/Pulsar,然后进行任何你需要的分析听起来对你有用。
历史数据的重新处理将取决于您需要执行何种查询。如果您只需要一个时间间隔 + id,您可以使用 Kafka 加上一个过滤器或适当的分区来做到这一点。 Kafka 允许您在特定时间戳开始处理,如果您的数据按 id 分区或将其作为分析的第一步进行过滤,您可以从提供的时间戳开始,并在您点击时间以外的消息时停止处理 window。这仅适用于您感兴趣的时间戳是消息添加到 Kafka 的时间。我也不相信 Kafka 在它生成的时间戳上支持低于毫秒的分辨率。
如果您需要执行更高级的查询(例如,您需要查看传感器生成的时间戳),您可以考虑使用 Cassandra or Elasticsearch or Solr 作为永久数据存储。您还需要研究如何将这些系统中的数据返回到您的分析系统中。例如,我相信 Spark 附带了一个用于从 Elasticsearch 读取的连接器,而 Elasticsearch 为 Storm 提供了一个连接器。您应该检查您的数据 store/analytics 系统组合是否存在这样的连接器,或者愿意编写您自己的连接器。
编辑:正在详细回答您的评论。
我不知道 Kafka 或 Pulsar 支持用户指定的时间戳,但果然,它们 both do。不过我没看到 Pulsar 支持亚毫秒时间戳?
你描述的idea绝对可以被Kafka支持。
您需要的是能够在特定时间戳启动 Kafka/Pulsar 客户端并向前阅读。 Pulsar 似乎还不支持这个,但是 Kafka 支持。
您需要保证当您将数据写入分区时,它们是按照时间戳的顺序到达的。这意味着您不允许例如写入时间戳为 10 的第一条消息 1,然后写入时间戳为 5 的消息 2。
如果您可以确保为 Kafka 编写消息,那么您描述的示例将起作用。然后你可以说 "Start at timestamp 'last night at midnight'",Kafka 将从那里开始。当实时数据进来时,它会接收它并将其添加到日志的末尾。当 consumer/analytics 框架读取了从上个午夜到当前时间的所有数据后,它将开始等待新的(实时)数据到达,并在数据到来时对其进行处理。然后您可以在分析中编写自定义代码框架以确保它在到达带有时间戳 'tomorrow night' 的第一条消息时停止处理。
关于亚毫秒时间戳的支持,我认为 Kafka 或 Pulsar 不会开箱即用地支持它,但您可以相当轻松地解决它。只需将亚毫秒时间戳作为自定义字段放入消息中即可。当你想从例如开始timestamp 9ms 10ns,你要求 Kafka 从 9ms 开始,并在分析框架中使用过滤器来丢弃 9ms 和 9ms 10ns 之间的所有消息。
请允许我添加以下关于 Apache Pulsar 如何帮助满足您的一些要求的建议。值得深思。
"data is flowing/streamed into the platform/framework, attached an identifier like a URL or an ID or such"
您可能想看看 Pulsar Functions,它允许您编写在每个单独的消息上执行的简单函数(在 Java 或 Python 中)发布到主题。它们非常适合此类数据增强用例。
平台与集成或外部存储交互以持久保存与标识符相关联的流数据(多年)
Pulsar 最近添加了 tiered-storage,它允许您在 S3、Azure Blob Store 或 Google 云存储中保留事件流。这将使您能够将数据保存在便宜且可靠的数据存储中多年
分析过程现在可以透明地 query/analyse 由标识符和任意(打开或关闭)时间 window 寻址的数据,并且框架为分析提供数据 batches/samples 来自后端存储或来自数据采集的实时
Apache Pulsar 还添加了 integration Presto 查询引擎,这将允许您查询给定时间段内的数据(包括来自分层存储的数据)并将其放置进入主题进行处理。
我对流和批处理的世界还很陌生,还试图理解概念和演讲。诚然,我的问题的答案很有可能众所周知,很容易找到,甚至在 SO 上回答了一百次,但我找不到。
背景:
我正在从事一个大型科学项目(核聚变研究),我们在实验运行期间产生大量测量数据。这些数据主要是标有纳秒时间戳的样本流,其中样本可以是任何东西,从单个 ADC 值,通过这样的数组,通过深度结构化数据(从 1 位布尔值到 64 位双精度的多达数百个条目)浮动)到原始高清视频帧甚至字符串文本消息。如果我正确理解常用术语,我会将我们的数据视为 "tabular data",在大多数情况下。
我们主要使用自制的软件解决方案,从简单的在线(流式)分析(如缩放、子采样等)到我们自己的数据存储、管理和访问设施的数据采集。
鉴于运营规模和维护所有这些实施的工作量,我们正在研究使用标准框架和工具来完成更多任务的可能性。
我的问题:
特别是在这个阶段,我们面临着对 live/online/realtime 数据以及 "after the fact" offline/batch 数据分析越来越复杂(自动和手动)的需求"historic" 数据。在这方面,我试图了解现有的分析框架,如 Spark、Flink、Storm 等(可能由消息队列如 Kafka、Pulsar 等支持)是否以及如何支持一个场景,其中
- 数据是 flowing/streamed 到 platform/framework 中,附加了一个标识符,如 URL 或 ID 等
- 平台与集成或外部存储交互以持久保存与标识符相关联的流数据(多年)
- 分析过程现在可以透明地 query/analyse 由标识符和任意(打开或关闭)时间 window 寻址的数据,并且框架为分析提供数据 batches/samples 来自后端存储或来自数据采集的实时
简单地将在线数据流式传输到存储中并从那里查询似乎没有选择,因为我们需要原始数据和分析数据来进行实时监控和实验的实时反馈控制。 此外,让用户以不同的方式查询实时输入信号或历史批次也不是理想的选择,因为我们的物理学家大多不是数据科学家,我们希望 "technicalities" 远离他们,理想情况下是确切的应该使用相同的算法来分析新的实时数据和以前实验中的旧存储数据。
站点注释:
- 我们谈论的是每秒 10 千兆比特范围内的偷看数据负载,这些数据负载会以秒为单位增加到几分钟——这可以由候选人处理吗?
- 我们正在使用纳秒分辨率的时间戳,甚至考虑 pico - 如果我理解正确,这会对可能的候选列表造成一些限制?
如果有人能够理解我的问题并为我阐明这个主题,我将非常感激:-)
非常感谢和亲切的问候, 贝波
我认为没有人可以说 "yes, framework X can definitely handle your workload",因为这在很大程度上取决于您对消息处理的需求,例如关于消息传递的可靠性,以及如何对数据流进行分区。
您可能对 BenchmarkingDistributedStreamProcessingEngines 感兴趣。该论文使用的是几年前的 Storm/Flink/Spark 版本(看起来它们是在 2016 年发布的),但也许作者愿意让您使用他们的基准来评估这三个框架的更新版本?
流式分析的一个非常常见的设置是转到数据源 -> Kafka/Pulsar -> 分析框架 -> 长期数据存储。这将处理与数据摄取分离开来,让您可以像处理新数据一样重新处理历史数据。
我觉得你的第一步应该是看看你能不能通过Kafka/Pulsar得到你需要的数据量。要么手动生成一个测试集,要么从你的生产环境中抓取一些你认为可以代表的数据,看看你是否可以在你需要的 throughput/latency 上通过 Kafka/Pulsar。
记得考虑对数据进行分区。如果您的某些数据流可以独立处理(即顺序无关紧要),则不应将它们放在相同的分区中。例如,可能没有理由混合传感器测量和视频馈送流。如果您可以将数据分成独立的流,那么您就不太可能 运行 陷入 Kafka/Pulsar 和分析框架的瓶颈。单独的数据流还可以让您更好地并行化分析框架中的处理,您可以 运行 例如不同机器上的视频馈送和传感器处理。
一旦您知道是否可以通过 Kafka/Pulsar 获得足够的吞吐量,您应该为 3 个框架中的每一个编写一个小示例。首先,我会从 Kafka/Pulsar 接收和丢弃数据,这应该让您及早知道 Kafka/Pulsar -> 分析路径中是否存在瓶颈。之后,您可以扩展示例以使用示例数据做一些有趣的事情,例如做一些你可能想在生产中做的处理。
您还需要考虑数据流需要哪些类型的处理保证。通常,您会因为保证至少一次或恰好一次处理而付出性能损失。对于某些类型的数据(例如视频源),偶尔丢失消息可能没问题。一旦您决定了所需的保证,您就可以适当地配置分析框架(例如禁用 Storm 中的确认),并尝试对您的测试数据进行基准测试。
只是为了更明确地回答您的一些问题:
实时数据 analysis/monitoring 用例听起来非常适合 Storm/Flink 系统。将它直接连接到 Kafka/Pulsar,然后进行任何你需要的分析听起来对你有用。
历史数据的重新处理将取决于您需要执行何种查询。如果您只需要一个时间间隔 + id,您可以使用 Kafka 加上一个过滤器或适当的分区来做到这一点。 Kafka 允许您在特定时间戳开始处理,如果您的数据按 id 分区或将其作为分析的第一步进行过滤,您可以从提供的时间戳开始,并在您点击时间以外的消息时停止处理 window。这仅适用于您感兴趣的时间戳是消息添加到 Kafka 的时间。我也不相信 Kafka 在它生成的时间戳上支持低于毫秒的分辨率。
如果您需要执行更高级的查询(例如,您需要查看传感器生成的时间戳),您可以考虑使用 Cassandra or Elasticsearch or Solr 作为永久数据存储。您还需要研究如何将这些系统中的数据返回到您的分析系统中。例如,我相信 Spark 附带了一个用于从 Elasticsearch 读取的连接器,而 Elasticsearch 为 Storm 提供了一个连接器。您应该检查您的数据 store/analytics 系统组合是否存在这样的连接器,或者愿意编写您自己的连接器。
编辑:正在详细回答您的评论。
我不知道 Kafka 或 Pulsar 支持用户指定的时间戳,但果然,它们 both do。不过我没看到 Pulsar 支持亚毫秒时间戳?
你描述的idea绝对可以被Kafka支持。
您需要的是能够在特定时间戳启动 Kafka/Pulsar 客户端并向前阅读。 Pulsar 似乎还不支持这个,但是 Kafka 支持。
您需要保证当您将数据写入分区时,它们是按照时间戳的顺序到达的。这意味着您不允许例如写入时间戳为 10 的第一条消息 1,然后写入时间戳为 5 的消息 2。
如果您可以确保为 Kafka 编写消息,那么您描述的示例将起作用。然后你可以说 "Start at timestamp 'last night at midnight'",Kafka 将从那里开始。当实时数据进来时,它会接收它并将其添加到日志的末尾。当 consumer/analytics 框架读取了从上个午夜到当前时间的所有数据后,它将开始等待新的(实时)数据到达,并在数据到来时对其进行处理。然后您可以在分析中编写自定义代码框架以确保它在到达带有时间戳 'tomorrow night' 的第一条消息时停止处理。
关于亚毫秒时间戳的支持,我认为 Kafka 或 Pulsar 不会开箱即用地支持它,但您可以相当轻松地解决它。只需将亚毫秒时间戳作为自定义字段放入消息中即可。当你想从例如开始timestamp 9ms 10ns,你要求 Kafka 从 9ms 开始,并在分析框架中使用过滤器来丢弃 9ms 和 9ms 10ns 之间的所有消息。
请允许我添加以下关于 Apache Pulsar 如何帮助满足您的一些要求的建议。值得深思。
"data is flowing/streamed into the platform/framework, attached an identifier like a URL or an ID or such"
您可能想看看 Pulsar Functions,它允许您编写在每个单独的消息上执行的简单函数(在 Java 或 Python 中)发布到主题。它们非常适合此类数据增强用例。
平台与集成或外部存储交互以持久保存与标识符相关联的流数据(多年)
Pulsar 最近添加了 tiered-storage,它允许您在 S3、Azure Blob Store 或 Google 云存储中保留事件流。这将使您能够将数据保存在便宜且可靠的数据存储中多年
分析过程现在可以透明地 query/analyse 由标识符和任意(打开或关闭)时间 window 寻址的数据,并且框架为分析提供数据 batches/samples 来自后端存储或来自数据采集的实时
Apache Pulsar 还添加了 integration Presto 查询引擎,这将允许您查询给定时间段内的数据(包括来自分层存储的数据)并将其放置进入主题进行处理。