Azure 服务总线队列数据接收器
Azure Service bus queue data receiver
我有以下从服务总线队列收集消息的 ProcessQueueMessage 函数:
public static void ProcessQueueMessage([ServiceBusTrigger("dsique2")] BrokeredMessage message,
TextWriter logger)
{
logger.WriteLine($"Processing message: {message}");
Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string s = reader.ReadToEnd();
//Parse json
string output = s.Substring(s.IndexOf('{') , s.IndexOf('}') - s.IndexOf('{') + 1);
var json = JsonConvert.DeserializeObject<dynamic>(output);
var TS = Convert.ToDouble(json.ts);
var Speed = Convert.ToDouble(json.speed);
var Ped = Convert.ToDouble(json.ped);
var BrakePed = Convert.ToDouble(json.brakeped);
var lateralAcc1 = Convert.ToDouble(json.lateralacc1);
var steeringAngle = Convert.ToDouble(json.steeringangle);
//sends parsed values to function
Program.receive_emulate(0x415, Speed, TS, "offlineSpeed");
Program.receive_emulate(0x204, BrakePed , TS, "BrakePed ");
Program.receive_emulate(0x7D, lateralAcc1 , TS, "lateralAcc1 ");
Program.receive_emulate(0x92, steeringAngle , TS, "steeringAngle ");
}
所以基本上,每当数据以 json 格式来自服务总线队列时,我都会解析 json,获取值并将每个信号发送到 receive_emulate 函数。问题是在 receive_emulate 函数中的数据处理结束之前触发了该函数。我想要做的是,处理单个消息,将它们发送到 receive_emulate 函数。当函数returns时,取队列中的第二条消息。不幸的是我无法做到这一点,任何想法将不胜感激。
另一个即使在 Azure 文档中写了 Fifo 在服务总线队列中得到保证,我注意到我的消息没有按顺序发送。有什么办法可以让它们以有序的形式出现吗?
非常感谢
如果我没看错你的代码,那么你正在使用带有服务总线触发器的 Azure Functions。
尝试在 hosts.json
文件中将 maxConcurrentCalls
参数设置为 1
,显示为 here。
The maximum number of concurrent calls to the callback the message
pump should initiate. The default is 16.
就像@Mikhail 指出的那样,将并发设置为 1 将允许您通过 host.json 以串行方式处理消息,您可以
我将专注于您关于 FIFO 的第二个问题。 Azure 服务总线 不 保证消息的顺序,除非你使用 Messaging Sessions. As far as I know, Service Bus sessions are not currently supported 和 Functions。
我有以下从服务总线队列收集消息的 ProcessQueueMessage 函数:
public static void ProcessQueueMessage([ServiceBusTrigger("dsique2")] BrokeredMessage message,
TextWriter logger)
{
logger.WriteLine($"Processing message: {message}");
Stream stream = message.GetBody<Stream>();
StreamReader reader = new StreamReader(stream);
string s = reader.ReadToEnd();
//Parse json
string output = s.Substring(s.IndexOf('{') , s.IndexOf('}') - s.IndexOf('{') + 1);
var json = JsonConvert.DeserializeObject<dynamic>(output);
var TS = Convert.ToDouble(json.ts);
var Speed = Convert.ToDouble(json.speed);
var Ped = Convert.ToDouble(json.ped);
var BrakePed = Convert.ToDouble(json.brakeped);
var lateralAcc1 = Convert.ToDouble(json.lateralacc1);
var steeringAngle = Convert.ToDouble(json.steeringangle);
//sends parsed values to function
Program.receive_emulate(0x415, Speed, TS, "offlineSpeed");
Program.receive_emulate(0x204, BrakePed , TS, "BrakePed ");
Program.receive_emulate(0x7D, lateralAcc1 , TS, "lateralAcc1 ");
Program.receive_emulate(0x92, steeringAngle , TS, "steeringAngle ");
}
所以基本上,每当数据以 json 格式来自服务总线队列时,我都会解析 json,获取值并将每个信号发送到 receive_emulate 函数。问题是在 receive_emulate 函数中的数据处理结束之前触发了该函数。我想要做的是,处理单个消息,将它们发送到 receive_emulate 函数。当函数returns时,取队列中的第二条消息。不幸的是我无法做到这一点,任何想法将不胜感激。 另一个即使在 Azure 文档中写了 Fifo 在服务总线队列中得到保证,我注意到我的消息没有按顺序发送。有什么办法可以让它们以有序的形式出现吗? 非常感谢
如果我没看错你的代码,那么你正在使用带有服务总线触发器的 Azure Functions。
尝试在 hosts.json
文件中将 maxConcurrentCalls
参数设置为 1
,显示为 here。
The maximum number of concurrent calls to the callback the message pump should initiate. The default is 16.
就像@Mikhail 指出的那样,将并发设置为 1 将允许您通过 host.json 以串行方式处理消息,您可以
我将专注于您关于 FIFO 的第二个问题。 Azure 服务总线 不 保证消息的顺序,除非你使用 Messaging Sessions. As far as I know, Service Bus sessions are not currently supported 和 Functions。