如何使用 Azure Cloud Functions 将所有 Azure 存储队列消息批处理为 Blob?
how to batch process all Azure Storage Queue Messages into a Blob with Azure Cloud Functions?
我想读取 Azure 队列存储的所有消息并将它们写入 Blob。理想情况下,我想读取 10000 个或更多的批次并将它们写入 Blob。
我正在使用带有队列存储绑定的 Azure Cloud Functions 进行输入,并使用 Blob 存储绑定进行输出,但是我找不到 API 或配置选项来让我阅读更多内容1 条消息。
有人知道这样的 API 吗?
官方文档并未提及任何支持在 Azure Function 的单次执行中批量处理存储队列消息。有an open issue in WebJobs SDK。所以,它不受支持。
如果您可以灵活选择将哪种服务用于消息传递中间件,则可以切换到事件中心。事件中心触发器支持(并鼓励)批量处理消息。不过它可能不会是 10.000:批量大小限制为 256k 数据。
要批量处理存储队列消息,您必须远离队列触发函数(例如 运行 计时器上的函数并连接到 table 存储以处理所有消息,或拥有自定义轮询 Web 作业,或使用带有自定义触发器的 Web 作业 SDK)。
我终于找到了一个令我非常满意的解决方案。
使用缓冲区不可扩展,因为运行时很容易超过 Azure Functions 运行时强加的 5 分钟限制,再加上明显的内存消耗问题,再加上我不得不使用计时器触发器,所以我需要以某种方式确保所有相关消息都在定时排队。
我现在所做的是使用普通队列绑定来获取消息,并使用节点存储 SDK 来实现某种 "fake" 流式传输到附加 Blob 中。因此,每条消息都被一条一条地转换为 CSV 行,并附加到相应的 blob。
这是我的函数代码:
const config = require('./config/config.js')
const storage = require('azure-storage')
const csvTransformer = require('./lib/csvTransform')
const async = require('async')
module.exports = function (context, myQueueItem) {
context.log(
'JavaScript queue trigger function processed work item',
myQueueItem
)
let blobService = storage.createBlobService(config.targetBlobConnection)
let messageDayString = csvTransformer.determineDayFromMessage(myQueueItem)
let blobName = messageDayString + '.csv'
let csvMessage
async.waterfall(
[
function (callback) {
blobService.createContainerIfNotExists(
config.targetBlobContainer,
{ publicAccessLevel: 'blob' },
err => {
callback(err)
}
)
},
function (callback) {
blobService.doesBlobExist(
config.targetBlobContainer,
blobName,
null,
(err, blobResult) => {
context.log('got blobResult: ', blobResult)
callback(err, blobResult)
}
)
},
function (blobResult, callback) {
if (blobResult && blobResult.exists) {
csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, false)
blobService.appendFromText(
config.targetBlobContainer,
blobName,
csvMessage,
null,
(err, appendedBlobResult) => {
context.log('appended to existing blob: ', appendedBlobResult)
callback(err, appendedBlobResult)
}
)
} else {
csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, true)
blobService.createAppendBlobFromText(
config.targetBlobContainer,
blobName,
csvMessage,
null,
(err, createdBlobResult) => {
context.log('created new blob: ', createdBlobResult)
callback(err, blobResult)
}
)
}
}
],
function (err, result) {
if (err) {
context.log.error('Error happened!')
context.log.error(err)
context.done(err)
} else {
context.log('appended CSV message to blob')
context.bindings.outputQueueItem = csvMessage
context.done()
}
}
)
}
我想读取 Azure 队列存储的所有消息并将它们写入 Blob。理想情况下,我想读取 10000 个或更多的批次并将它们写入 Blob。
我正在使用带有队列存储绑定的 Azure Cloud Functions 进行输入,并使用 Blob 存储绑定进行输出,但是我找不到 API 或配置选项来让我阅读更多内容1 条消息。 有人知道这样的 API 吗?
官方文档并未提及任何支持在 Azure Function 的单次执行中批量处理存储队列消息。有an open issue in WebJobs SDK。所以,它不受支持。
如果您可以灵活选择将哪种服务用于消息传递中间件,则可以切换到事件中心。事件中心触发器支持(并鼓励)批量处理消息。不过它可能不会是 10.000:批量大小限制为 256k 数据。
要批量处理存储队列消息,您必须远离队列触发函数(例如 运行 计时器上的函数并连接到 table 存储以处理所有消息,或拥有自定义轮询 Web 作业,或使用带有自定义触发器的 Web 作业 SDK)。
我终于找到了一个令我非常满意的解决方案。 使用缓冲区不可扩展,因为运行时很容易超过 Azure Functions 运行时强加的 5 分钟限制,再加上明显的内存消耗问题,再加上我不得不使用计时器触发器,所以我需要以某种方式确保所有相关消息都在定时排队。
我现在所做的是使用普通队列绑定来获取消息,并使用节点存储 SDK 来实现某种 "fake" 流式传输到附加 Blob 中。因此,每条消息都被一条一条地转换为 CSV 行,并附加到相应的 blob。
这是我的函数代码:
const config = require('./config/config.js')
const storage = require('azure-storage')
const csvTransformer = require('./lib/csvTransform')
const async = require('async')
module.exports = function (context, myQueueItem) {
context.log(
'JavaScript queue trigger function processed work item',
myQueueItem
)
let blobService = storage.createBlobService(config.targetBlobConnection)
let messageDayString = csvTransformer.determineDayFromMessage(myQueueItem)
let blobName = messageDayString + '.csv'
let csvMessage
async.waterfall(
[
function (callback) {
blobService.createContainerIfNotExists(
config.targetBlobContainer,
{ publicAccessLevel: 'blob' },
err => {
callback(err)
}
)
},
function (callback) {
blobService.doesBlobExist(
config.targetBlobContainer,
blobName,
null,
(err, blobResult) => {
context.log('got blobResult: ', blobResult)
callback(err, blobResult)
}
)
},
function (blobResult, callback) {
if (blobResult && blobResult.exists) {
csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, false)
blobService.appendFromText(
config.targetBlobContainer,
blobName,
csvMessage,
null,
(err, appendedBlobResult) => {
context.log('appended to existing blob: ', appendedBlobResult)
callback(err, appendedBlobResult)
}
)
} else {
csvMessage = csvTransformer.transformMessageToCSV(myQueueItem, true)
blobService.createAppendBlobFromText(
config.targetBlobContainer,
blobName,
csvMessage,
null,
(err, createdBlobResult) => {
context.log('created new blob: ', createdBlobResult)
callback(err, blobResult)
}
)
}
}
],
function (err, result) {
if (err) {
context.log.error('Error happened!')
context.log.error(err)
context.done(err)
} else {
context.log('appended CSV message to blob')
context.bindings.outputQueueItem = csvMessage
context.done()
}
}
)
}