Cassandra多进程异步执行阻塞同步请求

Cassandra assynchronous execution in multiple processes blocking synchronous requests

我有一个应用程序可以读取一系列 XML 文件,其中包含道路上的车辆通行记录。然后应用程序处理每条记录,转换一些信息以匹配数据库列并将其插入到 cassandra 数据库中(运行在远程服务器中的单个节点[它在内部网络中,因此连接不真的是个问题])。在数据库中插入数据后,每个文件的进程然后继续读取此数据并为汇总表生成信息,从而为在应用程序的不相关部分进行的向下钻取分析准备好信息。

我正在使用 multiprocessing 并行处理许多 XML 文件,我遇到的问题是与 cassandra 服务器通信。示意性地,该过程如下:

  1. 从 XML 文件读取记录
  2. 处理记录的数据
  3. 将处理后的数据插入数据库(使用.execute_async(query)
  4. 重复 1 到 3 直到 XMl 文件结束
  5. 等待我所做的所有插入查询的响应
  6. 从数据库读取数据
  7. 处理读取的数据
  8. 将处理后的数据插入汇总表

现在,这 运行 在多个并行进程中顺利进行,直到当一个进程继续执行第 6 步时,它的请求(使用 .execute(query) 发出,这意味着我将等待响应)总是面临超时。我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}

我已将服务器中的超时时间更改为荒谬的时间(例如 500000000 毫秒),并且我还尝试在客户端中设置超时限制,.execute(query, timeout=3000) 但仍然没有成功。

现在,当更多的进程遇到同样的问题,并且在多个进程中从步骤 1-3 开始的密集写入停止时,最后到达步骤 6 的进程成功地遵循了该过程,这让我认为问题是cassandra 优先考虑我每秒询问的数万个插入请求,要么忽略我的读取请求,要么将其放回队列中。

在我看来,解决这个问题的一种方法是,如果我可以以任何方式要求 cassandra 优先处理我的读取请求,以便我可以继续处理,即使这意味着减慢其他进程。

现在,作为旁注,您可能认为我的过程建模不是最佳的,我很想听听对此的意见,但对于这个应用程序的现实,在我们看来,这是最好的方法继续。所以我们实际上已经广泛考虑优化流程,但是(如果 cassandra 服务器可以处理它)这对我们的现实来说是最佳的。

那么,TL;DR:在执行数万个异步查询时,有没有办法优先处理一个查询?如果没有,有没有办法以请求不会超时的方式每秒执行数万次插入查询和读取查询?另外,你会建议我做什么来解决这个问题? 运行 减少并行进程显然是一种解决方案,但我正在努力避免。所以,很想听听大家的想法。

插入时存储数据,这样我就不需要再次阅读它来进行总结是不可能的,因为 XML 文件很大,内存是个问题。

我不知道有什么方法可以优先读取查询。我相信在内部 Cassandra 有单独的线程池用于读取和写入操作,所以这些是 运行 并行的。如果没有看到您正在执行的模式和查询,很难说您是否正在执行非常昂贵的读取操作,或者系统是否被写入淹没以至于无法跟上读取。

当您的应用程序 运行ning 时,您可能想尝试监视 Cassandra 中发生的事情。您可以使用多种工具来监控正在发生的事情。例如,如果您通过 ssh 连接到您的 Cassandra 节点并且 运行:

watch -n 1 nodetool tpstats

这将向您显示线程池统计信息(每秒更新一次)。您将能够查看队列是否已满或操作是否受阻。如果 "Dropped" 计数器中的任何一个增加,则表示您没有足够的容量来完成您尝试做的事情。如果是这种情况,则通过添加更多节点来增加容量,或者更改您的模式和方法,以便节点有更少的工作要做。

其他需要监控的有用的东西(在 linux 上使用 watch -n 1 持续监控):

nodetool compactionstats
nodetool netstats
nodetool cfstats <keyspace.table name>
nodetool cfhistograms <keyspace> <table name>

使用 top 和 iostat 等 linux 命令监视节点也很好,以检查 CPU 利用率和磁盘利用率。

我对你所说的印象是你的单个节点没有足够的能力来完成你给它的所有工作,所以你要么需要每单位时间处理更少的数据,要么添加更多的 Cassandra节点来分散工作量。

由于分区的行过多,我目前面临自己的超时错误,因此我可能必须向我的分区键添加基数以使每个分区的内容更小。