Python - 在单独线程中的函数 运行 之间传递函数(回调)变量
Python - Pass a function (callback) variable between functions running in separate threads
我正在尝试开发一个使用 pika 和线程模块的 Python 3.6 脚本。
我有一个问题,我认为是由于我的 A) 对 Python 和一般编码很陌生,B) 我不理解如何在函数之间传递变量 [=53] =] 在单独的线程中,并且已经在接收函数名称末尾的括号中传递了一个参数。
我之所以这么想,是因为当我不使用线程时,我可以简单地通过调用接收函数名称在函数之间传递变量,并提供要传递的变量,在括号中,一个基本的例子是如下所示:
def send_variable():
body = "this is a text string"
receive_variable(body)
def receive_variable(body):
print(body)
这在 运行 时打印:
this is a text string
我需要使用线程的代码的工作版本如下所示 - 它使用直接函数(无线程),我正在使用 pika 通过 pika 回调函数从(RabbitMQ)队列接收消息,然后我将 'callback' 函数中收到的消息正文传递给 'processing function' :
import pika
...mq connection variables set here...
# defines username and password credentials as variables set at the top of this script
credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)
# defines mq server host, port and user credentials and creates a connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
# creates a channel connection instance using the above settings
channel = connection.channel()
# defines the queue name to be used with the above channel connection instance
channel.queue_declare(queue=mq_queue)
def callback(ch, method, properties, body):
# passes (body) to processing function
body_processing(body)
# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
channel.start_consuming()
# calls the callback function to start receiving messages from mq server
callback()
# above deals with pika connection and the main callback function
def body_processing(body):
...code to send a pika message every time a 'body' message is received...
这很好用,但我想在使用线程的脚本中将其转换为 运行。当我这样做时,我必须将参数 'channel' 提供给 运行 在它自己的线程中的函数名称 - 然后我尝试包含 'body' 参数以便 'processing_function' 如下所示:
def processing_function(channel, body):
我收到一条错误消息:
[function_name] is missing 1 positional argument: 'body'
我知道在使用线程时需要更多代码,我在下面包含了我用于线程的实际代码,以便您可以看到我在做什么:
...imports and mq variables and pika connection details are set here...
def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
process_body(body)
#print (" Received %s" % (body))
channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
channel.start_consuming()
def process_body(channel, body):
channel.queue_declare(queue=queue2)
#print (' [*] Waiting for Tick messages. To exit press CTRL+C')
# sets the mq host which pika client will use to send a message to
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
channel = connection.channel()
# declare a queue to be used by the channel connection instance
channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
connection.close()
def manager():
# Channel 1 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel1 = connection1.channel()
# Channel 1 thread =====================================================================================================
t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
t1.daemon = True
threads.append(t1)
# as this is thread 1 call to start threading is made at start threading section
# Channel 2 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel2 = connection2.channel()
# Channel 2 thread ====================================================================================================
t2 = threading.Thread(target=process_body, args=(channel2, body))
t2.daemon = True
threads.append(t2)
t2.start() # as this is thread 2 - we need to start the thread here
# Start threading
t1.start() # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
t.join() # join defined threads
manager() # run the manager module which starts threads that call each module
这时 运行 产生错误
process_body() missing 1 required positional argument: (body)
而且我不明白这是为什么或如何解决它。
感谢您花时间阅读这个问题,非常感谢您提供的任何帮助或建议。
请记住,我是 python 和编码的新手,所以可能需要把事情拼写出来,而不是能够理解更多神秘的回复。
谢谢!
在进一步查看并使用代码后,如果我编辑这些行,似乎是:
def process_body(channel, body):
阅读
def process_body(body):
和
t2 = threading.Thread(target=process_body, args=(channel2, body))
所以它显示为:
t2 = threading.Thread(target=process_body)
然后代码似乎可以按需工作 - 我还在 htop 中看到多个脚本进程,因此看起来线程正在工作 - 我已经离开脚本处理 24 小时 + 并且没有收到任何错误...
我正在尝试开发一个使用 pika 和线程模块的 Python 3.6 脚本。
我有一个问题,我认为是由于我的 A) 对 Python 和一般编码很陌生,B) 我不理解如何在函数之间传递变量 [=53] =] 在单独的线程中,并且已经在接收函数名称末尾的括号中传递了一个参数。
我之所以这么想,是因为当我不使用线程时,我可以简单地通过调用接收函数名称在函数之间传递变量,并提供要传递的变量,在括号中,一个基本的例子是如下所示:
def send_variable():
body = "this is a text string"
receive_variable(body)
def receive_variable(body):
print(body)
这在 运行 时打印:
this is a text string
我需要使用线程的代码的工作版本如下所示 - 它使用直接函数(无线程),我正在使用 pika 通过 pika 回调函数从(RabbitMQ)队列接收消息,然后我将 'callback' 函数中收到的消息正文传递给 'processing function' :
import pika
...mq connection variables set here...
# defines username and password credentials as variables set at the top of this script
credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)
# defines mq server host, port and user credentials and creates a connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
# creates a channel connection instance using the above settings
channel = connection.channel()
# defines the queue name to be used with the above channel connection instance
channel.queue_declare(queue=mq_queue)
def callback(ch, method, properties, body):
# passes (body) to processing function
body_processing(body)
# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
channel.start_consuming()
# calls the callback function to start receiving messages from mq server
callback()
# above deals with pika connection and the main callback function
def body_processing(body):
...code to send a pika message every time a 'body' message is received...
这很好用,但我想在使用线程的脚本中将其转换为 运行。当我这样做时,我必须将参数 'channel' 提供给 运行 在它自己的线程中的函数名称 - 然后我尝试包含 'body' 参数以便 'processing_function' 如下所示:
def processing_function(channel, body):
我收到一条错误消息:
[function_name] is missing 1 positional argument: 'body'
我知道在使用线程时需要更多代码,我在下面包含了我用于线程的实际代码,以便您可以看到我在做什么:
...imports and mq variables and pika connection details are set here...
def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
process_body(body)
#print (" Received %s" % (body))
channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
channel.start_consuming()
def process_body(channel, body):
channel.queue_declare(queue=queue2)
#print (' [*] Waiting for Tick messages. To exit press CTRL+C')
# sets the mq host which pika client will use to send a message to
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
channel = connection.channel()
# declare a queue to be used by the channel connection instance
channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
connection.close()
def manager():
# Channel 1 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel1 = connection1.channel()
# Channel 1 thread =====================================================================================================
t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
t1.daemon = True
threads.append(t1)
# as this is thread 1 call to start threading is made at start threading section
# Channel 2 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel2 = connection2.channel()
# Channel 2 thread ====================================================================================================
t2 = threading.Thread(target=process_body, args=(channel2, body))
t2.daemon = True
threads.append(t2)
t2.start() # as this is thread 2 - we need to start the thread here
# Start threading
t1.start() # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
t.join() # join defined threads
manager() # run the manager module which starts threads that call each module
这时 运行 产生错误
process_body() missing 1 required positional argument: (body)
而且我不明白这是为什么或如何解决它。
感谢您花时间阅读这个问题,非常感谢您提供的任何帮助或建议。
请记住,我是 python 和编码的新手,所以可能需要把事情拼写出来,而不是能够理解更多神秘的回复。
谢谢!
在进一步查看并使用代码后,如果我编辑这些行,似乎是:
def process_body(channel, body):
阅读
def process_body(body):
和
t2 = threading.Thread(target=process_body, args=(channel2, body))
所以它显示为:
t2 = threading.Thread(target=process_body)
然后代码似乎可以按需工作 - 我还在 htop 中看到多个脚本进程,因此看起来线程正在工作 - 我已经离开脚本处理 24 小时 + 并且没有收到任何错误...