生产者-消费者问题 - 试图保存到 csv 文件中
Producer-consumer problem - trying to save into a csv file
所以这个看似简单的问题让我头疼。
我有一个数据集 (datas
) 并且我对其进行了一些处理(这不是问题,尽管由于数据集的大小这需要时间)以生成要存储的多行到 CSV 文件中。然而,生成一行,然后将其保存到 csv,然后生成一行,然后保存它等是非常费力的。
所以我正在尝试实现生产者和消费者线程 - 生产者将生成每一行数据(以加快流程),存储在队列中,然后单个消费者将附加到我的 csv 文件。
我的以下尝试有时会成功(数据已正确保存),有时数据会 "cut off"(整行或部分数据)。
我做错了什么?
from threading import Thread
from queue import Queue
import csv
q = Queue()
def producer():
datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
for data in datas:
q.put(data)
def consumer():
while True:
local = q.get()
file = open('dataset.csv','a')
with file as fd:
writer = csv.writer(fd)
writer.writerow(local)
file.close()
q.task_done()
for i in range(10):
t = Thread(target=consumer)
t.daemon = True
t.start()
producer()
q.join()
我认为这与您正在尝试做的事情类似。出于测试目的,它为生成的 CSV 文件中的每一行数据添加了前缀 "producer id",以便可以在结果中看到数据的来源。
正如您将能够从生成的 csv 文件中看到的那样,生成的所有数据都被放入其中。
import csv
import random
from queue import Queue
from threading import Thread
import time
SENTINEL = object()
def producer(q, id):
data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
("bye", "hat"))
for datum in data:
q.put((id,) + datum) # Prefix producer ID to datum for testing.
time.sleep(random.random()) # Vary thread speed for testing.
class Consumer(Thread):
def __init__(self, q):
super().__init__()
self.q = q
def run(self):
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file, delimiter=',')
while True:
datum = self.q.get()
if datum is SENTINEL:
break
writer.writerow(datum)
def main():
NUM_PRODUCERS = 10
queue = Queue()
# Create producer threads.
threads = []
for id in range(NUM_PRODUCERS):
t = Thread(target=producer, args=(queue, id+1,))
t.start()
threads.append(t)
# Create Consumer thread.
consumer = Consumer(queue)
consumer.start()
# Wait for all producer threads to finish.
while threads:
threads = [thread for thread in threads if thread.is_alive()]
queue.put(SENTINEL) # Indicate to consumer thread no more data.
consumer.join()
print('Done')
if __name__ == '__main__':
main()
所以这个看似简单的问题让我头疼。
我有一个数据集 (datas
) 并且我对其进行了一些处理(这不是问题,尽管由于数据集的大小这需要时间)以生成要存储的多行到 CSV 文件中。然而,生成一行,然后将其保存到 csv,然后生成一行,然后保存它等是非常费力的。
所以我正在尝试实现生产者和消费者线程 - 生产者将生成每一行数据(以加快流程),存储在队列中,然后单个消费者将附加到我的 csv 文件。
我的以下尝试有时会成功(数据已正确保存),有时数据会 "cut off"(整行或部分数据)。
我做错了什么?
from threading import Thread
from queue import Queue
import csv
q = Queue()
def producer():
datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
for data in datas:
q.put(data)
def consumer():
while True:
local = q.get()
file = open('dataset.csv','a')
with file as fd:
writer = csv.writer(fd)
writer.writerow(local)
file.close()
q.task_done()
for i in range(10):
t = Thread(target=consumer)
t.daemon = True
t.start()
producer()
q.join()
我认为这与您正在尝试做的事情类似。出于测试目的,它为生成的 CSV 文件中的每一行数据添加了前缀 "producer id",以便可以在结果中看到数据的来源。
正如您将能够从生成的 csv 文件中看到的那样,生成的所有数据都被放入其中。
import csv
import random
from queue import Queue
from threading import Thread
import time
SENTINEL = object()
def producer(q, id):
data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
("bye", "hat"))
for datum in data:
q.put((id,) + datum) # Prefix producer ID to datum for testing.
time.sleep(random.random()) # Vary thread speed for testing.
class Consumer(Thread):
def __init__(self, q):
super().__init__()
self.q = q
def run(self):
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file, delimiter=',')
while True:
datum = self.q.get()
if datum is SENTINEL:
break
writer.writerow(datum)
def main():
NUM_PRODUCERS = 10
queue = Queue()
# Create producer threads.
threads = []
for id in range(NUM_PRODUCERS):
t = Thread(target=producer, args=(queue, id+1,))
t.start()
threads.append(t)
# Create Consumer thread.
consumer = Consumer(queue)
consumer.start()
# Wait for all producer threads to finish.
while threads:
threads = [thread for thread in threads if thread.is_alive()]
queue.put(SENTINEL) # Indicate to consumer thread no more data.
consumer.join()
print('Done')
if __name__ == '__main__':
main()