多处理列表搜索

Multiprocessing list search

所以我使用进程和队列来搜索数据并在不同的列中找到具有相同条目的行。我决定使用 multiprocessing 来尝试使它可以针对大数据进行扩展。该文件有 1000 行,每行 10 个数据点。我只读入了 80 行数据,程序就停止了。 70 行,它工作正常,速度也不错。

我的问题是我做错了什么或者我没有发现这种方法的局限性?该代码无论如何都不是完美的,并且本身可能很糟糕。代码如下:

from multiprocessing import Process, Queue
import random

def openFile(file_name, k, division):
    i = 0
    dataSet = []
    with open(file_name) as f:
        for line in f:
            stripLine = line.strip('\n')
            splitLine = stripLine.split(division)
            dataSet += [splitLine]
            i += 1
            if(i == k):
                break

    return(dataSet)

def setCombination(q,data1,data2):
    newData = []
    for i in range(0,len(data1)):
        for j in range(0, len(data2)):
            if(data1[i][1] == data2[j][3]):
                newData += data2[j]
    q.put(newData)

if __name__ == '__main__':
    # Takes in the file, the length of the data to read in, and how the data is divided.
    data = openFile('testing.txt', 80, ' ')
    for i in range(len(data)):
        for j in range(len(data[i])):
            try:
                data[i][j] = float(data[i][j])
            except ValueError:
                 pass

    #print(data)
    k = len(data)//10
    q = Queue()
    processes = [Process(target=setCombination, args=(q, data[k*x: k + k*x], data))
                                                                for x in range(10)]
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()

    saleSet = [q.get() for p in processes]
    print('\n', saleSet)

数据文件testing.txt

您的代码似乎导致了死锁。在试验时,我注意到 10 个任务中有 3 个永远不会终止,但老实说,我真的不知道原因。

好消息是只需删除或禁用 [​​=12=] 即可轻松修复

# Exit the completed processes
for p in processes:
    p.join()

你的代码中有循环。

这是您的代码的完整版本,(大部分)只是其中的修改:

from multiprocessing import Process, Queue

def openFile(file_name, k, division):
    i = 0
    dataSet = []
    with open(file_name) as f:
        for line in f:
            stripLine = line.strip('\n')
            splitLine = stripLine.split(division)
            dataSet += [splitLine]
            i += 1
            if i == k:
                break

    return dataSet

def setCombination(q, data1, data2):
    newData = []
    for i in range(len(data1)):
        for j in range(len(data2)):
            if data1[i][1] == data2[j][3]:
                newData += data2[j]
    q.put(newData)

if __name__ == '__main__':
    # Takes in the file, the length of the data to read in, and how the data is divided.
    data = openFile('testing.txt', 80, ' ')

    for i in range(len(data)):
        for j in range(len(data[i])):
            try:
                data[i][j] = float(data[i][j])
            except ValueError:
                 pass

    k = len(data) // 10
    q = Queue()
    processes = [Process(target=setCombination, args=(q, data[k*x: k*x+k], data))
                    for x in range(10)]
    for p in processes:
        p.start()

# NO LONGER USED (HANGS)
#    # Exit the completed processes
#    for p in processes:
#        p.join()

    # note: this works since by default, get() will block until it can retrieve something
    saleSet = [q.get() for _ in processes]  # a queue item should be added by each Process
    print('\n', saleSet)