Azure 调度程序:将 JSON 消息添加到 Azure 存储队列
Azure scheduler: Add JSON message to Azure Storage Queue
我创建了一个在特定时间运行的调度程序,并将 JSON 消息插入存储队列。 JSON 消息是固定的,我在调度程序设置期间将其放在 BODY
字段中(表示 text/plain
)。消息是:
{ "action": "SendReminderMessages" }
在接收端(WebJob),我轮询队列,然后尝试反序列化这些消息中的 JSON。我得到的 JSON 消息不是上面预期的 JSON,而是包裹在 XML 消息中的 JSON 消息:
<?xml version="1.0" encoding="utf-16"?>
<StorageQueueMessage xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<ExecutionTag>(Some Hex String)</ExecutionTag>
<ClientRequestId>(Some GUID)</ClientRequestId>
<ExpectedExecutionTime>2015-09-17T07:00:00</ExpectedExecutionTime>
<SchedulerJobId>reminder-mails</SchedulerJobId>
<SchedulerJobCollectionId>scheduler-jobs</SchedulerJobCollectionId>
<Region>West Europe</Region>
<Message>{ "action": "SendReminderMessages" }</Message>
</StorageQueueMessage>
如何按原样发送 JSON 消息,即没有信封?
这听起来很奇怪,我只对电子邮件做完全相同的事情,而且对我来说效果很好。我已经添加了我的代码,我希望它能帮助你发现你的错误。
/// <summary>
/// Create or update a queue
/// </summary>
/// <param name="queueName">Name of queue</param>
/// <param name="message">Message to post in the queue</param>
/// <returns>Return true on success, false if already exists, throw exception on error</returns>
public bool PutMessage(string queueName, CloudQueueMessage message)
{
try
{
var queue = queueContext.QueueClient.GetQueueReference(queueName);
queue.AddMessage(message);
return true;
}
catch (StorageClientException ex)
{
if ((int)ex.StatusCode == 404) return false;
throw;
}
}
/// <summary>
/// Serializes a e-mail to Json
/// </summary>
/// <param name="from">Sender e-mail address</param>
/// <param name="to">Recipient e-mail address</param>
/// <param name="subject">E-mail subject text</param>
/// <param name="body">E-mail body text</param>
/// <returns>Json</returns>
string SerializeEmail(string from, string to, string subject, string body)
{
var email = new EmailEntity()
{
From = from,
To = to,
Subject = subject,
Body = body
};
var stream = new MemoryStream();
var serializer = new DataContractJsonSerializer(typeof(EmailEntity));
serializer.WriteObject(stream, email);
var serializedEmail = Encoding.Default.GetString(stream.ToArray());
stream.Close();
return serializedEmail;
}
/// <summary>
/// Puts a e-mail on the Azure queue to be sent
/// </summary>
/// <param name="from">Sender e-mail address</param>
/// <param name="to">Recipient e-mail address</param>
/// <param name="subject">E-mail subject text</param>
/// <param name="body">E-mail body text</param>
/// <returns>Return true on success, false if already exists, throw exception on error</returns>
public bool SendEmail(string from, string to, string subject, string body)
{
var serializedEmail = SerializeEmail(from, to, subject, body);
var message = new CloudQueueMessage(serializedEmail);
return PutMessage(AzureQueueContext.EmailQueueName, message);
}
/// <summary>
/// Retrieve the next message from a queue
/// </summary>
/// <param name="queueName">Name of queue</param>
/// <param name="message">Message</param>
/// <returns>Return true on success (message available), false if no message or no queue, throw exception on error</returns>
bool GetMessage(string queueName, out CloudQueueMessage message)
{
message = null;
try
{
var queue = GetCloudQueueClient().GetQueueReference(queueName);
message = queue.GetMessage();
return message != null;
}
catch (StorageClientException ex)
{
if ((int)ex.StatusCode == 404) return false;
throw;
}
}
/// <summary>
/// Deserializes a e-mail message
/// </summary>
/// <param name="message">Message serialized in the Json inerchange format</param>
/// <returns>Deserialized e-mail as a EmailEntity object</returns>
EmailEntity DeserializeEmail(CloudQueueMessage message)
{
var stream = new MemoryStream(Encoding.Default.GetBytes(message.AsString));
var ser = new DataContractJsonSerializer(typeof(EmailEntity));
var email = (EmailEntity)ser.ReadObject(stream);
stream.Close();
return email;
}
/// <summary>
/// Retrieve the next message from a queue
/// </summary>
/// <param name="email">E-mail</param>
/// <returns>Return true on success (message available), false if no message or no queue, throw exception on error</returns>
public bool GetEmail(out EmailEntity email, out CloudQueueMessage message)
{
var result = GetMessage(AzureQueueContext.EmailQueueName, out message);
email = result ? DeserializeEmail(message) : null;
return result;
}
调度程序当前向正文添加了一个 XML 包装器,作为传递作业元数据的一种方式。您可以使用 Scheduler SDK 中的 StorageQueueMessage class(在 Microsoft.WindowsAzure.Scheduler.Models 下)来正确反序列化消息。
您可以投票支持 Azure 调度程序功能,以添加不在我们的 Azure Scheduler User Voice forum 上包含包装器的功能。
我之前通过创建一个完全独立的 WebJob 来解决这个问题,该 WebJob 读取 XML,将 JSON 拉出,并将 re-queues JSON 值放入存储队列(此处的解决方案:https://github.com/Stonefinch/AzureIntro/tree/master/src/AzureIntro.WebJobs.AzureScheduler)
不过,我最近发现使用 Azure Functions 对消息进行排队提供了一个不错的解决方法。更多信息在这里:http://aaron-hoffman.blogspot.com/2017/01/replace-azure-scheduler-with-azure.html
我创建了一个在特定时间运行的调度程序,并将 JSON 消息插入存储队列。 JSON 消息是固定的,我在调度程序设置期间将其放在 BODY
字段中(表示 text/plain
)。消息是:
{ "action": "SendReminderMessages" }
在接收端(WebJob),我轮询队列,然后尝试反序列化这些消息中的 JSON。我得到的 JSON 消息不是上面预期的 JSON,而是包裹在 XML 消息中的 JSON 消息:
<?xml version="1.0" encoding="utf-16"?>
<StorageQueueMessage xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<ExecutionTag>(Some Hex String)</ExecutionTag>
<ClientRequestId>(Some GUID)</ClientRequestId>
<ExpectedExecutionTime>2015-09-17T07:00:00</ExpectedExecutionTime>
<SchedulerJobId>reminder-mails</SchedulerJobId>
<SchedulerJobCollectionId>scheduler-jobs</SchedulerJobCollectionId>
<Region>West Europe</Region>
<Message>{ "action": "SendReminderMessages" }</Message>
</StorageQueueMessage>
如何按原样发送 JSON 消息,即没有信封?
这听起来很奇怪,我只对电子邮件做完全相同的事情,而且对我来说效果很好。我已经添加了我的代码,我希望它能帮助你发现你的错误。
/// <summary>
/// Create or update a queue
/// </summary>
/// <param name="queueName">Name of queue</param>
/// <param name="message">Message to post in the queue</param>
/// <returns>Return true on success, false if already exists, throw exception on error</returns>
public bool PutMessage(string queueName, CloudQueueMessage message)
{
try
{
var queue = queueContext.QueueClient.GetQueueReference(queueName);
queue.AddMessage(message);
return true;
}
catch (StorageClientException ex)
{
if ((int)ex.StatusCode == 404) return false;
throw;
}
}
/// <summary>
/// Serializes a e-mail to Json
/// </summary>
/// <param name="from">Sender e-mail address</param>
/// <param name="to">Recipient e-mail address</param>
/// <param name="subject">E-mail subject text</param>
/// <param name="body">E-mail body text</param>
/// <returns>Json</returns>
string SerializeEmail(string from, string to, string subject, string body)
{
var email = new EmailEntity()
{
From = from,
To = to,
Subject = subject,
Body = body
};
var stream = new MemoryStream();
var serializer = new DataContractJsonSerializer(typeof(EmailEntity));
serializer.WriteObject(stream, email);
var serializedEmail = Encoding.Default.GetString(stream.ToArray());
stream.Close();
return serializedEmail;
}
/// <summary>
/// Puts a e-mail on the Azure queue to be sent
/// </summary>
/// <param name="from">Sender e-mail address</param>
/// <param name="to">Recipient e-mail address</param>
/// <param name="subject">E-mail subject text</param>
/// <param name="body">E-mail body text</param>
/// <returns>Return true on success, false if already exists, throw exception on error</returns>
public bool SendEmail(string from, string to, string subject, string body)
{
var serializedEmail = SerializeEmail(from, to, subject, body);
var message = new CloudQueueMessage(serializedEmail);
return PutMessage(AzureQueueContext.EmailQueueName, message);
}
/// <summary>
/// Retrieve the next message from a queue
/// </summary>
/// <param name="queueName">Name of queue</param>
/// <param name="message">Message</param>
/// <returns>Return true on success (message available), false if no message or no queue, throw exception on error</returns>
bool GetMessage(string queueName, out CloudQueueMessage message)
{
message = null;
try
{
var queue = GetCloudQueueClient().GetQueueReference(queueName);
message = queue.GetMessage();
return message != null;
}
catch (StorageClientException ex)
{
if ((int)ex.StatusCode == 404) return false;
throw;
}
}
/// <summary>
/// Deserializes a e-mail message
/// </summary>
/// <param name="message">Message serialized in the Json inerchange format</param>
/// <returns>Deserialized e-mail as a EmailEntity object</returns>
EmailEntity DeserializeEmail(CloudQueueMessage message)
{
var stream = new MemoryStream(Encoding.Default.GetBytes(message.AsString));
var ser = new DataContractJsonSerializer(typeof(EmailEntity));
var email = (EmailEntity)ser.ReadObject(stream);
stream.Close();
return email;
}
/// <summary>
/// Retrieve the next message from a queue
/// </summary>
/// <param name="email">E-mail</param>
/// <returns>Return true on success (message available), false if no message or no queue, throw exception on error</returns>
public bool GetEmail(out EmailEntity email, out CloudQueueMessage message)
{
var result = GetMessage(AzureQueueContext.EmailQueueName, out message);
email = result ? DeserializeEmail(message) : null;
return result;
}
调度程序当前向正文添加了一个 XML 包装器,作为传递作业元数据的一种方式。您可以使用 Scheduler SDK 中的 StorageQueueMessage class(在 Microsoft.WindowsAzure.Scheduler.Models 下)来正确反序列化消息。 您可以投票支持 Azure 调度程序功能,以添加不在我们的 Azure Scheduler User Voice forum 上包含包装器的功能。
我之前通过创建一个完全独立的 WebJob 来解决这个问题,该 WebJob 读取 XML,将 JSON 拉出,并将 re-queues JSON 值放入存储队列(此处的解决方案:https://github.com/Stonefinch/AzureIntro/tree/master/src/AzureIntro.WebJobs.AzureScheduler)
不过,我最近发现使用 Azure Functions 对消息进行排队提供了一个不错的解决方法。更多信息在这里:http://aaron-hoffman.blogspot.com/2017/01/replace-azure-scheduler-with-azure.html