Azure 函数服务总线没有属性 application_properties
Azure function Service bus has no attribute application_properties
我有一个 Azure 函数 ServiceBus,我想对其属性进行自定义。
我有 2 个主题,我发送了关于主题 1 的消息(我对其进行了一些处理)并将其输出发送到第二个主题。由于此消息来自不同的资源这一事实,我希望应用程序属性可以清楚地表明此消息的来源。
所以我的代码如下:
def main(message: func.ServiceBusMessage):
logging.info(message)
print(message)
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
.....
.....
message1 = str(message_body)
def send_output(sender):
message_out = ServiceBusMessage(
output_json,
content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
)
sender.send_messages(message_out)
但是当我发送消息时,函数失败并抛出以下错误。
Result: Failure Exception: AttributeError: 'ServiceBusMessage' object has no attribute 'application_properties' Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/function/testServiceBus.py", line 118, in main send_output(sender) File "/home/site/wwwroot/function/testServiceBus.py", line 108, in send_output application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
我试图用变量 message1
替换 message
但在这种情况下我得到一个错误 str does not contain an attribute...
如果能帮助我理解我做错了什么并更好地理解它是如何工作的,我们将不胜感激。
非常感谢您,如果您需要更多信息,请告诉我
更新。
根据 Microsoft 文档,从 servicebus 5.x
它们替换了 user_propertieswith
applicationProperties`
我确实尝试了他们两个,但没有任何效果。
这是为了让 OP 更清楚一些。
这是我更新后的代码
import logging
import json
import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage
def main(message: func.ServiceBusMessage):
# Log the Service Bus Message as plaintext
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
result = json.dumps({
'message_id': message.message_id,
'body': message.get_body().decode('utf-8'),
'content_type': message.content_type,
'user_properties': message.user_properties,
'metadata' : message.metadata
})
logging.info(result)
# logging.info("Python ServiceBus topic trigger processed message.")
# logging.info("Message Content Type: " + message_content_type)
# logging.info("Message Body: " + message_body)
CONN_STR = "XXX"
topic_b = "topic_b"
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONN_STR)
def send_output(sender):
message_out = ServiceBusMessage(
result,
content_type="application/json", #setting the content type so that the service bus can route it.
user_properties={b'tenant': result.user_properties[b'MessageId']} #setting the tenant code
)
sender.send_messages(message_out)
servicebus_client_out = servicebus_client.from_connection_string(conn_str=CONN_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client_out.get_topic_sender(topic_name=topic_b)
with sender:
send_output(sender)
出于测试目的,我尝试将自定义 属性 添加到 return 和 message_id
,因为它是传入消息的一部分。但是我收到此错误
Result: Failure Exception: AttributeError: 'str' object has no attribute 'user_properties' Stack
所以我尝试 sender
消息是
Result: Failure Exception: AttributeError: 'ServiceBus' object has no attribute 'user_properties' Stack
这是我在 Microsoft 文档的任何地方都找不到的内容。
为了确保一切正常,我尝试了以下代码
application_properties={'tenant': 'DEMO'}
这很好用。
如果有人对此问题有任何提示,我将不胜感激
我必须同意the documentation is contradictory。
所以,让我们忽略文档并直接进入源代码:https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py#L63
这告诉我们正确的关键字参数是application_properties
.
提醒:这个答案指出 current(在撰写本文时)代码在 main
分支中。这似乎适用于 azure-servicebus
package.
的版本 7+
@gvee 是的,文档完全矛盾,库也不完全清楚。我设法解决了我的问题,希望这对以后的任何人都有帮助。
图书馆服务总线 7+ 已解决此问题
在我的具体情况下,我需要函数 2 执行 2 个重要步骤。他们如下
- 从有关主题 A 的传入消息中获取所有自定义属性并将特定 属性 保存在变量中。
- 在将消息从主题 A 发送到主题 B 时..将保存的 属性 附加到消息
before
以将其发送出去。
正如我在 OP 中提到的,现在对我来说主要的问题是库无法识别我声明的属性。这主要是由于库有 2 个属性,user_properties AND
application_properties.
所以我必须在我的代码中做如下。
在服务总线上 message
我必须使用 user_properties
来检索特定的 属性。
def main(message: func.ServiceBusMessage):
logging.info(message)
test_user = message.user_properties['property']
这样我就能够提取出我正在寻找的字符串。到目前为止一切顺利。
然后我想将 test_user
作为 属性 附加到消息中,同时将其发送到主题 B.. 所以我再次尝试使用 user_properties
但是这个失败,因为无法识别该属性。
这里有一个奇怪的想法。我用的是application_properties
,效果如下
def send_output(sender):
message_out = ServiceBusMessage(
output_json,
content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
application_properties={'property': test_user}
)
sender.send_messages(message_out)
所以我的结论是,要检索 属性,我应该使用 user_properties
并设置自定义 属性 我必须使用 application_properties
.
我无法详细解释为什么它以这种方式工作,因为文档根本不清楚(至少对我而言)并且源代码与库功能没有正确对齐。
但我希望这对以后的人有所帮助。
我有一个 Azure 函数 ServiceBus,我想对其属性进行自定义。
我有 2 个主题,我发送了关于主题 1 的消息(我对其进行了一些处理)并将其输出发送到第二个主题。由于此消息来自不同的资源这一事实,我希望应用程序属性可以清楚地表明此消息的来源。
所以我的代码如下:
def main(message: func.ServiceBusMessage):
logging.info(message)
print(message)
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
.....
.....
message1 = str(message_body)
def send_output(sender):
message_out = ServiceBusMessage(
output_json,
content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
)
sender.send_messages(message_out)
但是当我发送消息时,函数失败并抛出以下错误。
Result: Failure Exception: AttributeError: 'ServiceBusMessage' object has no attribute 'application_properties' Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 52, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py", line 606, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/function/testServiceBus.py", line 118, in main send_output(sender) File "/home/site/wwwroot/function/testServiceBus.py", line 108, in send_output application_properties={b'source':message.application_properties[b'source']} #setting the tenant code
我试图用变量 message1
替换 message
但在这种情况下我得到一个错误 str does not contain an attribute...
如果能帮助我理解我做错了什么并更好地理解它是如何工作的,我们将不胜感激。
非常感谢您,如果您需要更多信息,请告诉我
更新。
根据 Microsoft 文档,从 servicebus 5.x
它们替换了 user_propertieswith
applicationProperties`
我确实尝试了他们两个,但没有任何效果。
这是为了让 OP 更清楚一些。
这是我更新后的代码
import logging
import json
import azure.functions as func
from azure.servicebus import ServiceBusClient, ServiceBusMessage
def main(message: func.ServiceBusMessage):
# Log the Service Bus Message as plaintext
message_content_type = message.content_type
message_body = message.get_body().decode("utf-8")
result = json.dumps({
'message_id': message.message_id,
'body': message.get_body().decode('utf-8'),
'content_type': message.content_type,
'user_properties': message.user_properties,
'metadata' : message.metadata
})
logging.info(result)
# logging.info("Python ServiceBus topic trigger processed message.")
# logging.info("Message Content Type: " + message_content_type)
# logging.info("Message Body: " + message_body)
CONN_STR = "XXX"
topic_b = "topic_b"
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONN_STR)
def send_output(sender):
message_out = ServiceBusMessage(
result,
content_type="application/json", #setting the content type so that the service bus can route it.
user_properties={b'tenant': result.user_properties[b'MessageId']} #setting the tenant code
)
sender.send_messages(message_out)
servicebus_client_out = servicebus_client.from_connection_string(conn_str=CONN_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client_out.get_topic_sender(topic_name=topic_b)
with sender:
send_output(sender)
出于测试目的,我尝试将自定义 属性 添加到 return 和 message_id
,因为它是传入消息的一部分。但是我收到此错误
Result: Failure Exception: AttributeError: 'str' object has no attribute 'user_properties' Stack
所以我尝试 sender
消息是
Result: Failure Exception: AttributeError: 'ServiceBus' object has no attribute 'user_properties' Stack
这是我在 Microsoft 文档的任何地方都找不到的内容。
为了确保一切正常,我尝试了以下代码
application_properties={'tenant': 'DEMO'}
这很好用。
如果有人对此问题有任何提示,我将不胜感激
我必须同意the documentation is contradictory。
所以,让我们忽略文档并直接进入源代码:https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py#L63
这告诉我们正确的关键字参数是application_properties
.
提醒:这个答案指出 current(在撰写本文时)代码在 main
分支中。这似乎适用于 azure-servicebus
package.
@gvee 是的,文档完全矛盾,库也不完全清楚。我设法解决了我的问题,希望这对以后的任何人都有帮助。
图书馆服务总线 7+ 已解决此问题
在我的具体情况下,我需要函数 2 执行 2 个重要步骤。他们如下
- 从有关主题 A 的传入消息中获取所有自定义属性并将特定 属性 保存在变量中。
- 在将消息从主题 A 发送到主题 B 时..将保存的 属性 附加到消息
before
以将其发送出去。
正如我在 OP 中提到的,现在对我来说主要的问题是库无法识别我声明的属性。这主要是由于库有 2 个属性,user_properties AND
application_properties.
所以我必须在我的代码中做如下。
在服务总线上 message
我必须使用 user_properties
来检索特定的 属性。
def main(message: func.ServiceBusMessage):
logging.info(message)
test_user = message.user_properties['property']
这样我就能够提取出我正在寻找的字符串。到目前为止一切顺利。
然后我想将 test_user
作为 属性 附加到消息中,同时将其发送到主题 B.. 所以我再次尝试使用 user_properties
但是这个失败,因为无法识别该属性。
这里有一个奇怪的想法。我用的是application_properties
,效果如下
def send_output(sender):
message_out = ServiceBusMessage(
output_json,
content_type="ModuleCommentAnalyzed", #setting the content type so that the service bus can route it.
application_properties={'property': test_user}
)
sender.send_messages(message_out)
所以我的结论是,要检索 属性,我应该使用 user_properties
并设置自定义 属性 我必须使用 application_properties
.
我无法详细解释为什么它以这种方式工作,因为文档根本不清楚(至少对我而言)并且源代码与库功能没有正确对齐。
但我希望这对以后的人有所帮助。