Python multiprocessing/threading 数据不一致问题排查

Troubleshooting data inconsistencies with Python multiprocessing/threading

TL;DR:在 运行 具有线程和多处理以及单线程的代码后得到不同的结果。需要有关故障排除的指导。

你好,如果这可能有点过于笼统,我提前道歉,但我需要一些帮助来解决问题,我不确定如何最好地进行。

这是故事;我有一堆索引到 Solr 集合中的数据(~250m 项),该集合中的所有项都有一个 sessionid。一些项目可以共享相同的会话 ID。我正在梳理整个集合以提取具有相同会话的所有项目,稍微修改数据并吐出另一个 JSON 文件以供稍后索引。

代码有两个主要功能: proc_day - 接受一天并处理当天的所有会话 和 proc_session - 完成单个会话需要发生的一切。

多处理在proc_day上实现,所以每天都会由一个单独的进程处理,proc_session函数可以运行线程。下面是我在下面用于 threading/multiprocessing 的代码。它接受一个函数、一个参数列表和线程/多进程数。然后它将根据输入参数创建一个队列,然后创建 processes/threads 并让它们通过它。我没有 post 实际代码,因为它通常可以在单线程下正常运行,没有任何问题,但如果需要,可以 post 它。

autoprocs.py

import sys
import logging
from multiprocessing import Process, Queue,JoinableQueue
import time
import multiprocessing
import os

def proc_proc(func,data,threads,delay=10):
    if threads < 0:
        return
    q = JoinableQueue()
    procs = []

    for i in range(threads):
        thread = Process(target=proc_exec,args=(func,q))
        thread.daemon = True;
        thread.start()
        procs.append(thread)

    for item in data:
        q.put(item)

    logging.debug(str(os.getpid()) + ' *** Processes started and data loaded into queue waiting')

    s = q.qsize()
    while s > 0:
        logging.info(str(os.getpid()) + " - Proc Queue Size is:" + str(s))
        s = q.qsize()
        time.sleep(delay)

    for p in procs:
        logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
        p.join(1)

    logging.debug(str(os.getpid()) + ' - *** Main Proc waiting')
    q.join()
    logging.debug(str(os.getpid()) + ' - *** Done')

def proc_exec(func,q):
    p = multiprocessing.current_process()
    logging.debug(str(os.getpid()) + ' - Starting:{},{}'.format(p.name, p.pid))
    while True:
        d = q.get()
        try:
            logging.debug(str(os.getpid()) + " - Starting to Process {}".format(d))
            func(d)
            sys.stdout.flush()
            logging.debug(str(os.getpid()) + " - Marking Task as Done")
            q.task_done()
        except:
            logging.error(str(os.getpid()) + " - Exception in subprocess execution")
            logging.error(sys.exc_info()[0])
    logging.debug(str(os.getpid()) + 'Ending:{},{}'.format(p.name, p.pid))

autothreads.py:

import threading
import logging
import time
from queue import Queue

def thread_proc(func,data,threads):
    if threads < 0:
        return "Thead Count not specified"
    q = Queue()

    for i in range(threads):
        thread = threading.Thread(target=thread_exec,args=(func,q))
        thread.daemon = True
        thread.start()

    for item in data:
        q.put(item)

    logging.debug('*** Main thread waiting')
    s = q.qsize()
    while s > 0:
        logging.debug("Queue Size is:" + str(s))
        s = q.qsize()
        time.sleep(1)
    logging.debug('*** Main thread waiting')
    q.join()
    logging.debug('*** Done')

def thread_exec(func,q):
    while True:
        d = q.get()
        #logging.debug("Working...")
        try:
            func(d)
        except:
            pass
        q.task_done()

在 python 在不同的 multiprocessing/threading 配置下运行后,我 运行 遇到验证数据的问题。有很多数据,所以我真的需要让多处理工作。这是我昨天的测试结果。

Only with multiprocessing - 10 procs: 
Days Processed  30
Sessions Found  3,507,475 
Sessions Processed 3,514,496 
Files 162,140 
Data Output: 1.9G

multiprocessing and multithreading - 10 procs 10 threads
Days Processed  30
Sessions Found   3,356,362 
Sessions Processed   3,272,402 
Files    424,005 
Data Output: 2.2GB

just threading - 10 threads
Days Processed  31
Sessions Found   3,595,263 
Sessions Processed   3,595,263 
Files    733,664 
Data Output: 3.3GB

Single process/ no threading
Days Processed  31
Sessions Found   3,595,263 
Sessions Processed   3,595,263 
Files    162,190 
Data Output: 1.9GB

这些计数是通过日志文件中的 grepping 和县条目收集的(每个主进程 1 个)。跳出的第一件事是处理的天数不匹配。但是,我手动检查了日志文件,看起来好像缺少一个日志条目,日志条目后面有表明实际处理了这一天。我不知道为什么它被省略了。

我真的不想写更多的代码来验证这段代码,这看起来太浪费时间了,有什么替代方法吗?

我在上面的评论中给出了一些一般性提示。我认为您的方法在非常不同的抽象级别上存在多个问题。您也没有显示所有相关代码。

问题很可能是

  1. 在您用于从 solr 读取数据或在将数据提供给您的工作人员之前准备读取数据的方法中。
  2. 在您提出的用于在多个进程之间分配工作的架构中。
  3. 在您的日志记录基础设施中(正如您自己指出的那样)。
  4. 在你的分析方法中。

必须检查所有这些要点,并且由于问题的复杂性,这里肯定没有人能够为您确定确切的问题。

关于第(3)点和第(4)点:

如果您不确定日志文件的完整性,您应该根据处理引擎的负载输出进行分析。我想说的是:日志文件可能只是数据处理的副产品。主要产品是您应该分析的东西。当然,正确设置日志也很重要。但是这两个问题应该分开对待。

我对上面列表中第 (2) 点的贡献:

您基于 multiprocessing 的解决方案特别值得怀疑的是您等待 工作人员完成的方式。你似乎不确定应该用哪种方法等待你的工人,所以你应用了三种不同的方法:

首先,你在while循环中监控队列的大小,等待它变为0。这是一种非规范的方法,可能确实有效。

其次,你join()你的流程很奇怪:

for p in procs:
    logging.debug(str(os.getpid()) + " - Joining Process {}".format(p))
    p.join(1)

为什么在这里定义一秒的超时,而不响应进程是否在该时间范围内实际终止?您应该真正加入一个进程,即等到它 终止 或者您指定一个超时,如果该超时在进程完成之前到期,请特别对待这种情况。你的代码没有区分这些情况,所以 p.join(1) 就像写 time.sleep(1) 一样。

第三次,你加入队列。

那么,在确定了q.qsize() returns 0之后,再等一秒之后,你真的认为加入队列重要吗?这有什么区别吗? 这些方法中的一种 应该就足够了,您需要考虑这些标准中的哪一个对您的问题最重要。也就是说,这些条件之一应该 确定性地暗示 其他两个。

所有这些看起来像是对多处理解决方案的快速而肮脏的破解,而您自己并不确定该解决方案的行为方式。我在研究并发架构时获得的最重要的见解之一:作为架构师,您必须 100% 了解通信和控制流在您的系统中的工作方式。未正确监视和控制工作进程的状态很可能是您所观察到的问题的根源。

我明白了,我听从了 Jan-Philip 的建议并开始检查 multiprocess/multithreaded 过程的输出数据。事实证明,一个用来自 Solr 的数据做所有这些事情的对象在线程之间共享。我没有任何锁定机制,所以在这种情况下它有来自多个会话的混合数据,导致输出不一致。我通过为每个线程实例化一个新对象并匹配计数来验证这一点。它有点慢,但仍然可行。

谢谢