AttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queue
AttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queue
我有以下使用调度程序和多处理模块的代码:
def computation():
def function1(q):
while True:
daydate = datetime.now()
number = random.randrange(1, 215)
print('Sent to function2: ({}, {})'.format(daydate, number))
q.put((daydate, number))
time.sleep(2)
def function2(q):
while True:
date, number = q.get()
print("Recevied values from function1: ({}, {})".format(date, number))
time.sleep(2)
if __name__ == "__main__":
q = Queue()
a = Process(target=function1, args=(q,))
a.start()
b = Process(target=function2, args=(q,))
b.start()
a.join()
b.join()
schedule.every().monday.at("08:45").do(computation)
schedule.every().tuesday.at("08:45").do(computation)
while True:
schedule.run_pending()
time.sleep(1)
但是在执行代码时出现以下错误:
AttributeError: Can't pickle local object 'computation..
function1
并且:
OSError: [WinError 87] The parameter is incorrect
如何解决这个问题?我试图通过在文档 (https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled) 中所述的模块的顶层定义一个函数来解决这个问题,但是它仍然给出相同的错误。
嵌套函数不是在顶层定义的函数,所以这就是您收到错误的原因。您需要将 function1
和 function2
的定义移到外面
的计算。
按照您的写法,您的流程会立即开始,而不是在您安排 运行 的日期开始。这可能符合您的预期:
import os
import time
import random
from multiprocessing import Process, Queue
from threading import Thread
from datetime import datetime
import schedule
def function1(q):
while True:
daydate = datetime.now()
number = random.randrange(1, 215)
fmt = '(pid: {}) Sent to function2: ({}, {})'
print(fmt.format(os.getpid(), daydate, number))
q.put((daydate, number))
time.sleep(2)
def function2(q):
while True:
date, number = q.get()
fmt = "(pid: {}) Received values from function1: ({}, {})"
print(fmt.format(os.getpid(), date, number))
# time.sleep(2) no need to sleep here because q.get will block until
# new items are available
def computation():
q = Queue()
a = Process(target=function1, args=(q,))
a.start()
b = Process(target=function2, args=(q,))
b.start()
a.join()
b.join()
if __name__ == "__main__":
# We are spawning new threads as a launching platform for
# computation. Without it, the next job couldn't start before the last
# one has finished. If your jobs always end before the next one should
# start, you don't need this construct and you can just pass
# ...do(computation)
schedule.every().friday.at("01:02").do(
Thread(target=computation).start
)
schedule.every().friday.at("01:03").do(
Thread(target=computation).start
)
while True:
schedule.run_pending()
time.sleep(1)
就像现在一样,您的进程将在启动一次后永远 运行。如果这不是您想要的,您必须考虑实施一些停止条件。
我有以下使用调度程序和多处理模块的代码:
def computation():
def function1(q):
while True:
daydate = datetime.now()
number = random.randrange(1, 215)
print('Sent to function2: ({}, {})'.format(daydate, number))
q.put((daydate, number))
time.sleep(2)
def function2(q):
while True:
date, number = q.get()
print("Recevied values from function1: ({}, {})".format(date, number))
time.sleep(2)
if __name__ == "__main__":
q = Queue()
a = Process(target=function1, args=(q,))
a.start()
b = Process(target=function2, args=(q,))
b.start()
a.join()
b.join()
schedule.every().monday.at("08:45").do(computation)
schedule.every().tuesday.at("08:45").do(computation)
while True:
schedule.run_pending()
time.sleep(1)
但是在执行代码时出现以下错误:
AttributeError: Can't pickle local object 'computation.. function1
并且:
OSError: [WinError 87] The parameter is incorrect
如何解决这个问题?我试图通过在文档 (https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled) 中所述的模块的顶层定义一个函数来解决这个问题,但是它仍然给出相同的错误。
嵌套函数不是在顶层定义的函数,所以这就是您收到错误的原因。您需要将 function1
和 function2
的定义移到外面
的计算。
按照您的写法,您的流程会立即开始,而不是在您安排 运行 的日期开始。这可能符合您的预期:
import os
import time
import random
from multiprocessing import Process, Queue
from threading import Thread
from datetime import datetime
import schedule
def function1(q):
while True:
daydate = datetime.now()
number = random.randrange(1, 215)
fmt = '(pid: {}) Sent to function2: ({}, {})'
print(fmt.format(os.getpid(), daydate, number))
q.put((daydate, number))
time.sleep(2)
def function2(q):
while True:
date, number = q.get()
fmt = "(pid: {}) Received values from function1: ({}, {})"
print(fmt.format(os.getpid(), date, number))
# time.sleep(2) no need to sleep here because q.get will block until
# new items are available
def computation():
q = Queue()
a = Process(target=function1, args=(q,))
a.start()
b = Process(target=function2, args=(q,))
b.start()
a.join()
b.join()
if __name__ == "__main__":
# We are spawning new threads as a launching platform for
# computation. Without it, the next job couldn't start before the last
# one has finished. If your jobs always end before the next one should
# start, you don't need this construct and you can just pass
# ...do(computation)
schedule.every().friday.at("01:02").do(
Thread(target=computation).start
)
schedule.every().friday.at("01:03").do(
Thread(target=computation).start
)
while True:
schedule.run_pending()
time.sleep(1)
就像现在一样,您的进程将在启动一次后永远 运行。如果这不是您想要的,您必须考虑实施一些停止条件。