从 Python 中的 Kafka 更新消息流维护 table 的最佳数据结构

Best data structure to maintain a table from a stream of Kafka update messages in Python

假设我有一个固定维度 (N x M) 的表格数据集。我从 table 中的 Kafka 更新条目收到一连串更新。最终,我想要一个 pandas 数据框和最新版本的 table,我正在考虑这样做的几个选项:

  1. 将其作为 table / 数据帧保存在内存中。我在这里担心的是,我不知道是否可以避免多线程,因为一个进程将永远处于接收消息的 for 循环中。

  2. 在外部结构中维护它,并有一个单独的进程独立读取它。外部数据存储的选择: a) SQLite - 可能有并发问题,任意行的更新可能有点混乱。 b) Redis - 易于维护,但很难一次查询/读取整个 table(这是我通常访问数据的方式)。

我是 Kafka 的初学者,所以如果有任何建议,我们将不胜感激。你会如何处理这个问题?谢谢!

编辑:我想我也可以将它保存在内存中,然后将整个内容推送到 SQLite?

我最初的方法是问:我可以先创建一个“足够好”的解决方案,然后在需要时对其进行优化吗?

除非您需要担心非常敏感的信息(如医疗保健或财务数据),或者数据肯定会很快扩大规模,否则我建议您先尝试一个简单的解决方案,然后看看您是否遇到任何问题。你可能不会!

最终,我可能会从 SQLite 解决方案开始,因为它设置起来相对简单,而且非常适合用例(即“事务性”情况)。

以下是我会考虑的一些注意事项:

Pros/cons 单个进程

除非您的数据是 high-velocity / high-volume,否则您在同一进程中使用和处理数据的建议可能没问题。在本地处理数据比通过网络接收数据快得多(假设您的 Kafka 提要不在您的本地计算机上),因此从 Kafka 摄取的数据可能是瓶颈。

但是,让 Python 进程无限期地旋转可能代价高昂,并且您需要确保将数据存储到文件或数据库中,以防止在您的数据丢失时丢失进程关闭。

关系数据库(例如 SQLite)

使用像 SQLite 这样的关系数据库可能是您最好的选择,同样取决于您接收数据的速度。但是关系数据库一直用于事务目的(实际上这是它们的主要预期目的之一),这意味着写入量大且速度快——因此将数据持久保存在 SQLite 中并在那里进行更新绝对有意义.如果有意义(例如第三范式),您可以考虑将数据分成单独的 table,或者如果这样更合适,您可以将它们全部放在一个 table 中。

保持记忆中的table

您也可以像您建议的那样将 table 保留在内存中,只要您在更新后以某种方式(CSV、SQLite 等)将其保存到磁盘即可。例如,您可以:

  1. 记住你的副本。
  2. 当您获得更新时,请更新您的 in-memory table。
  3. 将 table 写入磁盘。
  4. 如果您的进程停止或重新启动,请从内存中读取 table 以启动。
但是,

Pandas 访问和更新行中的单个值可能会更慢,因此将 table 作为字典或其他内容保存在内存中并将其写入实际上可能更有意义不使用 pandas 的磁盘。但是,如果您可以在 pandas 中完成所有操作(关于速度和音量),那也可能是一个很好的开始方式。