ETL 设计:我应该使用什么队列来代替我的 SQL table 并且仍然能够并行处理?
ETL design: What Queue should I use instead of my SQL table and still be able to process in parallel?
需要您的帮助来重新设计我的系统。我们有非常简单的 ETL,但也很老,现在当我们处理大量数据时,它变得非常慢且不灵活
第一个进程是收集器进程:
收集器进程 - 始终运行
- collector 从队列中收集消息 (rabbitMQ)
- 将消息属性(JSON 格式)解析为 java 对象(例如,如果 JSON 包含 'id' 和 'name' 和 'color' 我们将使用 int 字段 'id' 和字符串字段 'name' 以及字符串字段 'color')
创建 java 对象
- 解析后我们将对象作为 CSV 行写入 CSV 文件,其中包含对象中的所有属性
- 我们发送确认并继续处理队列中的下一条消息
处理工作流 - 每小时发生一次
- 名为 'Loader' 的进程将所有 CSV 文件(收集器输出)加载到名为 'Input' 的 DB table ] 使用 SQL INFILE LOAD 所有新行都具有 'Not handled' 状态。输入 table 就像这个设计中的队列
- 一个名为'Processor'的进程从table中读取所有'Not handled'状态的记录,将其转换为java对象,进行一些丰富,然后将记录插入到 另一个 table 名为 'Output' 的新字段中,**每次迭代我们并行处理 1000 行 - 并使用JDBC 数据库插入的批量更新**。
这个流程的主要问题:
消息在现有流程中不灵活 - 例如,如果我想将新的 属性 添加到JSON 消息(例如还要添加 'city' )我还必须将 'city' 列添加到 table (因为加载了 CSV 文件), table 包含大量数据,不可能每次消息更改时都添加列。
我的结论
table 不是此设计的正确选择。
我必须摆脱 CSV 写作并删除 'Input' table 才能拥有一个灵活的系统,我想也许可以使用队列而不是 table 像 KAFKA 一样,可能会使用 KAFKA 流等工具进行丰富。 - 这将使我更加灵活,每次我想向消息添加一个字段时,我都不需要向 table 添加一列
我将无法像今天处理的那样并行处理这个巨大的问题。
我可以使用什么来代替 table 来并行处理数据?
是的,使用 Kafka 会改善这一点。
摄取
您当前写入 CSV 文件的进程可以改为发布到 Kafka 主题。这可能是 RabbitMQ 的替代品,具体取决于您的要求和范围。
加载程序(可选)
您以 初始格式 加载数据并写入数据库 table 的其他进程可以改为以您想要的格式发布到另一个 Kafka 主题。如果能直接写出处理器想要的格式,这一步可以省略
处理器
您使用 'Not handled' status
的方式是一种将数据视为队列的方式,但这是通过使用 log 的 Kafka 设计来处理的(是关系数据库被建模为集合).
处理器订阅加载器或摄取写入的消息。它 将其转换为 java 对象,进行一些丰富 - 但不是将结果插入到新的 table,它可以将数据发布到新的输出 -主题。
不是在批次中工作:"each iteration we process 1000 rows in parallel - and using JDBC batchupdate for the DB insert"使用Kafka和流处理,这是在连续的实时流中完成的——当数据到达时。
模式可演化性
if i want for example to add new property to the json message (for example to add also 'city' ) i have to add also column 'city' to the table (because of the csv infile Load) , the table contains massive amount of data and its not possible to add column every time the message changes .
您可以在发布到 Kafka 主题时使用 Avro Schema 来解决这个问题。
需要您的帮助来重新设计我的系统。我们有非常简单的 ETL,但也很老,现在当我们处理大量数据时,它变得非常慢且不灵活
第一个进程是收集器进程:
收集器进程 - 始终运行
- collector 从队列中收集消息 (rabbitMQ)
- 将消息属性(JSON 格式)解析为 java 对象(例如,如果 JSON 包含 'id' 和 'name' 和 'color' 我们将使用 int 字段 'id' 和字符串字段 'name' 以及字符串字段 'color') 创建 java 对象
- 解析后我们将对象作为 CSV 行写入 CSV 文件,其中包含对象中的所有属性
- 我们发送确认并继续处理队列中的下一条消息
处理工作流 - 每小时发生一次
- 名为 'Loader' 的进程将所有 CSV 文件(收集器输出)加载到名为 'Input' 的 DB table ] 使用 SQL INFILE LOAD 所有新行都具有 'Not handled' 状态。输入 table 就像这个设计中的队列
- 一个名为'Processor'的进程从table中读取所有'Not handled'状态的记录,将其转换为java对象,进行一些丰富,然后将记录插入到 另一个 table 名为 'Output' 的新字段中,**每次迭代我们并行处理 1000 行 - 并使用JDBC 数据库插入的批量更新**。
这个流程的主要问题:
消息在现有流程中不灵活 - 例如,如果我想将新的 属性 添加到JSON 消息(例如还要添加 'city' )我还必须将 'city' 列添加到 table (因为加载了 CSV 文件), table 包含大量数据,不可能每次消息更改时都添加列。
我的结论
table 不是此设计的正确选择。
我必须摆脱 CSV 写作并删除 'Input' table 才能拥有一个灵活的系统,我想也许可以使用队列而不是 table 像 KAFKA 一样,可能会使用 KAFKA 流等工具进行丰富。 - 这将使我更加灵活,每次我想向消息添加一个字段时,我都不需要向 table 添加一列 我将无法像今天处理的那样并行处理这个巨大的问题。
我可以使用什么来代替 table 来并行处理数据?
是的,使用 Kafka 会改善这一点。
摄取
您当前写入 CSV 文件的进程可以改为发布到 Kafka 主题。这可能是 RabbitMQ 的替代品,具体取决于您的要求和范围。
加载程序(可选)
您以 初始格式 加载数据并写入数据库 table 的其他进程可以改为以您想要的格式发布到另一个 Kafka 主题。如果能直接写出处理器想要的格式,这一步可以省略
处理器
您使用 'Not handled' status
的方式是一种将数据视为队列的方式,但这是通过使用 log 的 Kafka 设计来处理的(是关系数据库被建模为集合).
处理器订阅加载器或摄取写入的消息。它 将其转换为 java 对象,进行一些丰富 - 但不是将结果插入到新的 table,它可以将数据发布到新的输出 -主题。
不是在批次中工作:"each iteration we process 1000 rows in parallel - and using JDBC batchupdate for the DB insert"使用Kafka和流处理,这是在连续的实时流中完成的——当数据到达时。
模式可演化性
if i want for example to add new property to the json message (for example to add also 'city' ) i have to add also column 'city' to the table (because of the csv infile Load) , the table contains massive amount of data and its not possible to add column every time the message changes .
您可以在发布到 Kafka 主题时使用 Avro Schema 来解决这个问题。