确保子程序代码中的原子性
Ensure atomicity in subroutine's code
我有以下代码
#!/bin/env python
#
from multiprocessing import Process
from multiprocessing import Queue
import time
import os
# Define an output queue
output = Queue()
# define a example function
def f(x, output):
time.sleep(.5)
ppid = os.getppid() # PPID
pid = os.getpid() # PID
# very computing intensive operation
result = 10*x
print "(%s, %s, %s)" % (pp, p, result)
time.sleep(.5)
# store result as tuple
result = (ppid, pid, result)
output.put(result)
# return result
def queue_size(queue):
size = int(queue.qsize())
print size
# Print parent pid
print "Parent pid: %s" % os.getpid()
# Setup a list of processes that we want to run
processes = [Process(target=f, args=(x, output)) for x in range(1,11)]
# Run processes
for p in processes:
p.start()
# Process has no close attribute
# for p in processes:
# p.close()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
print "Order of result might be different from order of print"
print "See: "
print ""
results = [output.get() for p in processes]
print(results)
我想用这样的多个语句替换 print "(%s, %s, %s)" % (pp, p, result)
:
print "ppid: %s" % ppid
print "pid: %s" % pid
print "result: %s" % result
print "#####################"
为此,我选择了信号量来确保此输出是原子的。这是修改后的版本:
#!/bin/env python
#
from multiprocessing import Process
from multiprocessing import Queue
import threading
import time
import os
max_threads = 1
semaphore = threading.BoundedSemaphore(max_threads)
# Define an output queue
output = Queue()
# define a example function
def f(x, output):
time.sleep(.5)
ppid = os.getppid() # PPID
pid = os.getpid() # PID
# very computing intensive operation
result = 10*x
# print "(%s, %s, %s)" % (pp, p, result)
semaphore.acquire()
print "ppid: %s" % ppid
print "pid: %s" % pid
print "result: %s" % result
print "#####################"
semaphore.release()
time.sleep(.5)
# store result as tuple
result = (ppid, pid, result)
output.put(result)
# return result
def queue_size(queue):
size = int(queue.qsize())
print size
# Print parent pid
print "Parent pid: %s" % os.getpid()
# Setup a list of processes that we want to run
processes = [Process(target=f, args=(x, output)) for x in range(1,11)]
# Run processes
for p in processes:
p.start()
# Process has no close attribute
# for p in processes:
# p.close()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
print "Order of result might be different from order of print"
print "See: "
print ""
results = [output.get() for p in processes]
print(results)
但这些操作似乎不是原子操作(PID 10269 和 PID 10270),信号量也无济于事,输出如下:
Parent pid: 10260
ppid: 10260
pid: 10264
result: 40
#####################
ppid: 10260
pid: 10263
result: 30
#####################
ppid: 10260
pid: 10265
result: 50
#####################
ppid: 10260
pid: 10262
result: 20
#####################
ppid: 10260
pid: 10267
result: 70
#####################
ppid: 10260
pid: 10268
result: 80
#####################
ppid: 10260
pid: 10261
result: 10
#####################
ppid: 10260
ppid: 10260
pid: 10269
pid: 10270
result: 90
result: 100
#####################
#####################
ppid: 10260
pid: 10266
result: 60
#####################
Order of result might be different from order of print
See:
[(10260, 10264, 40), (10260, 10263, 30), (10260, 10265, 50), (10260, 10267, 70), (10260, 10262, 20), (10260, 10268, 80), (10260, 10261, 10), (10260, 10270, 100), (10260, 10269, 90), (10260, 10266, 60)]
为什么?
您正在使用 运行 f
的进程,但您正在尝试使用线程信号量进行同步。您在这里混合了不兼容的多任务模型。正如您在程序中使用的进程,运行s 在不同的内存中 space 并且具有独立的程序计数器,这意味着您无法像它们在单个程序中 运行ning 那样同步它们。线程 运行 在单个程序中,共享内存。
我的意思是,processes
中的每个进程都将 运行 作为一个独立的程序。你可以尝试使用multiprocessing.Lock,但我认为锁定独立程序只是为了打印调试输出是没有意义的。
相反,我建议您更改打印语句:
print("ppid: {}\n"
"pid: {}\n"
"result: \n"
"#####################".format(ppid, pid, result))
请注意,您可以放置分隔的字符串,python 解释器可以自动加入它们。还引入 \n
插入换行符。我还更改为 print() 函数和 format(),不推荐使用 %
。
使用这种方法,混合输出的可能性较小,但仍有可能发生。如果不够好,请使用 multiprocessing.Lock 而不是 threading.Lock,无需进一步更改代码。
我有以下代码
#!/bin/env python
#
from multiprocessing import Process
from multiprocessing import Queue
import time
import os
# Define an output queue
output = Queue()
# define a example function
def f(x, output):
time.sleep(.5)
ppid = os.getppid() # PPID
pid = os.getpid() # PID
# very computing intensive operation
result = 10*x
print "(%s, %s, %s)" % (pp, p, result)
time.sleep(.5)
# store result as tuple
result = (ppid, pid, result)
output.put(result)
# return result
def queue_size(queue):
size = int(queue.qsize())
print size
# Print parent pid
print "Parent pid: %s" % os.getpid()
# Setup a list of processes that we want to run
processes = [Process(target=f, args=(x, output)) for x in range(1,11)]
# Run processes
for p in processes:
p.start()
# Process has no close attribute
# for p in processes:
# p.close()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
print "Order of result might be different from order of print"
print "See: "
print ""
results = [output.get() for p in processes]
print(results)
我想用这样的多个语句替换 print "(%s, %s, %s)" % (pp, p, result)
:
print "ppid: %s" % ppid
print "pid: %s" % pid
print "result: %s" % result
print "#####################"
为此,我选择了信号量来确保此输出是原子的。这是修改后的版本:
#!/bin/env python
#
from multiprocessing import Process
from multiprocessing import Queue
import threading
import time
import os
max_threads = 1
semaphore = threading.BoundedSemaphore(max_threads)
# Define an output queue
output = Queue()
# define a example function
def f(x, output):
time.sleep(.5)
ppid = os.getppid() # PPID
pid = os.getpid() # PID
# very computing intensive operation
result = 10*x
# print "(%s, %s, %s)" % (pp, p, result)
semaphore.acquire()
print "ppid: %s" % ppid
print "pid: %s" % pid
print "result: %s" % result
print "#####################"
semaphore.release()
time.sleep(.5)
# store result as tuple
result = (ppid, pid, result)
output.put(result)
# return result
def queue_size(queue):
size = int(queue.qsize())
print size
# Print parent pid
print "Parent pid: %s" % os.getpid()
# Setup a list of processes that we want to run
processes = [Process(target=f, args=(x, output)) for x in range(1,11)]
# Run processes
for p in processes:
p.start()
# Process has no close attribute
# for p in processes:
# p.close()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
print "Order of result might be different from order of print"
print "See: "
print ""
results = [output.get() for p in processes]
print(results)
但这些操作似乎不是原子操作(PID 10269 和 PID 10270),信号量也无济于事,输出如下:
Parent pid: 10260
ppid: 10260
pid: 10264
result: 40
#####################
ppid: 10260
pid: 10263
result: 30
#####################
ppid: 10260
pid: 10265
result: 50
#####################
ppid: 10260
pid: 10262
result: 20
#####################
ppid: 10260
pid: 10267
result: 70
#####################
ppid: 10260
pid: 10268
result: 80
#####################
ppid: 10260
pid: 10261
result: 10
#####################
ppid: 10260
ppid: 10260
pid: 10269
pid: 10270
result: 90
result: 100
#####################
#####################
ppid: 10260
pid: 10266
result: 60
#####################
Order of result might be different from order of print
See:
[(10260, 10264, 40), (10260, 10263, 30), (10260, 10265, 50), (10260, 10267, 70), (10260, 10262, 20), (10260, 10268, 80), (10260, 10261, 10), (10260, 10270, 100), (10260, 10269, 90), (10260, 10266, 60)]
为什么?
您正在使用 运行 f
的进程,但您正在尝试使用线程信号量进行同步。您在这里混合了不兼容的多任务模型。正如您在程序中使用的进程,运行s 在不同的内存中 space 并且具有独立的程序计数器,这意味着您无法像它们在单个程序中 运行ning 那样同步它们。线程 运行 在单个程序中,共享内存。
我的意思是,processes
中的每个进程都将 运行 作为一个独立的程序。你可以尝试使用multiprocessing.Lock,但我认为锁定独立程序只是为了打印调试输出是没有意义的。
相反,我建议您更改打印语句:
print("ppid: {}\n"
"pid: {}\n"
"result: \n"
"#####################".format(ppid, pid, result))
请注意,您可以放置分隔的字符串,python 解释器可以自动加入它们。还引入 \n
插入换行符。我还更改为 print() 函数和 format(),不推荐使用 %
。
使用这种方法,混合输出的可能性较小,但仍有可能发生。如果不够好,请使用 multiprocessing.Lock 而不是 threading.Lock,无需进一步更改代码。