ES7 承诺并等待在后台永远循环的异步函数
ES7 promises and awaiting async function which loops forever in background
这可能是一个特例:
我想从队列 (AWS SQS) 中读取,这是通过调用等待几秒钟的消息然后解析来完成的 - 并在循环中一次又一次地调用,只要你想处理那个队列(它每次检查一个标志)。
这意味着我有一个 consume
函数,它是 运行 只要应用程序处于活动状态,或者队列未标记。
而且我还有一个用于订阅队列的 subscribe
函数 - 一旦知道消费者能够连接到队列,该函数就会立即解析。即使此函数调用消费者保持 运行 而不会 return 直到队列未标记。
它给了我一些挑战 - 关于如何使用现代 JS 和 async/await promise 解决这个问题,你有什么建议吗?我记得这段代码在 React 网络应用程序中是 运行,而不是在 node.js.
中
我基本上只希望 await subscribe(QUEUE)
调用(来自 GUI)在确定可以从该队列中读取时立即解析。但如果它不能,我希望它抛出一个错误,该错误会传播到订阅调用的来源——这意味着我必须 await consume(QUEUE)
,对吧?
更新:
添加了一些未经测试的代码草案(如果我没有采取正确的方法,我不想花更多时间让它工作) - 我考虑过将成功和失败回调发送到消费函数,以便它可以报告成功一旦它从队列中获得第一个有效(但可能为空)响应,它就会将队列 url 存储为订阅 - 如果队列轮询失败则取消订阅。
因为我设置了几个队列消费者,所以他们不应该阻塞任何东西而只是在后台工作
let subscribedQueueURLs = []
async function consumeQueue(
url: QueueURL,
success: () => mixed,
failure: (error: Error) => mixed
) {
const sqs = new AWS.SQS()
const params = {
QueueUrl: url,
WaitTimeSeconds: 20,
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const receivedData = await sqs.receiveMessage(params).promise()
if (!subscribedQueueURLs.includes(url)) {
success()
}
// eslint-disable-next-line no-restricted-syntax
for (const message of receivedData.Messages) {
console.log({ message })
// eslint-disable-next-line no-await-in-loop
eventHandler && (await eventHandler.message(message, url))
const deleteParams = {
QueueUrl: url,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
const deleteResult = await sqs.deleteMessage(deleteParams).promise()
console.log({ deleteResult })
}
} while (subscribedQueueURLs.includes(url))
} catch (error) {
failure(error)
}
}
export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
const url = generateQueueURL(entityType, entityId)
consumeQueue(
url,
() => {
subscribedQueueURLs.push(url)
eventHandler && eventHandler.subscribe(url)
},
error => {
console.error(error)
unsubscribe(entityType, entityId)
}
)
}
我最终是这样解决的——虽然这可能不是最优雅的解决方案...
let eventHandler: ?EventHandler
let awsOptions: ?AWSOptions
let subscribedQueueUrls = []
let sqs = null
let sns = null
export function setup(handler: EventHandler) {
eventHandler = handler
}
export async function login(
{ awsKey, awsSecret, awsRegion }: AWSCredentials,
autoReconnect: boolean
) {
const credentials = new AWS.Credentials(awsKey, awsSecret)
AWS.config.update({ region: awsRegion, credentials })
sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
sns = new AWS.SNS({ apiVersion: '2010-03-31' })
const sts = new AWS.STS({ apiVersion: '2011-06-15' })
const { Account } = await sts.getCallerIdentity().promise()
awsOptions = { accountId: Account, region: awsRegion }
eventHandler && eventHandler.login({ awsRegion, awsKey, awsSecret }, autoReconnect)
}
async function handleQueueMessages(messages, queueUrl) {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
// eslint-disable-next-line no-restricted-syntax
for (const message of messages) {
if (!eventHandler) {
return
}
// eslint-disable-next-line no-await-in-loop
await eventHandler.message({
content: message,
queueUrl,
timestamp: new Date().toISOString(),
})
const deleteParams = {
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
await sqs.deleteMessage(deleteParams).promise()
}
}
export async function subscribe(queueUrl: QueueUrl) {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
const initialParams = {
QueueUrl: queueUrl,
WaitTimeSeconds: 0,
MessageAttributeNames: ['All'],
AttributeNames: ['All'],
}
const longPollParams = {
...initialParams,
WaitTimeSeconds: 20,
}
// Attempt to consume the queue, and handle any pending messages.
const firstResponse = await sqs.receiveMessage(initialParams).promise()
if (!subscribedQueueUrls.includes(queueUrl)) {
subscribedQueueUrls.push(queueUrl)
eventHandler && eventHandler.subscribe(queueUrl)
}
handleQueueMessages(firstResponse.Messages, queueUrl)
// Keep on polling the queue afterwards.
setImmediate(async () => {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const received = await sqs.receiveMessage(longPollParams).promise()
handleQueueMessages(received.Messages, queueUrl)
} while (sqs && subscribedQueueUrls.includes(queueUrl))
} catch (error) {
eventHandler && eventHandler.disconnect()
throw error
}
})
}
这可能是一个特例:
我想从队列 (AWS SQS) 中读取,这是通过调用等待几秒钟的消息然后解析来完成的 - 并在循环中一次又一次地调用,只要你想处理那个队列(它每次检查一个标志)。
这意味着我有一个 consume
函数,它是 运行 只要应用程序处于活动状态,或者队列未标记。
而且我还有一个用于订阅队列的 subscribe
函数 - 一旦知道消费者能够连接到队列,该函数就会立即解析。即使此函数调用消费者保持 运行 而不会 return 直到队列未标记。
它给了我一些挑战 - 关于如何使用现代 JS 和 async/await promise 解决这个问题,你有什么建议吗?我记得这段代码在 React 网络应用程序中是 运行,而不是在 node.js.
中我基本上只希望 await subscribe(QUEUE)
调用(来自 GUI)在确定可以从该队列中读取时立即解析。但如果它不能,我希望它抛出一个错误,该错误会传播到订阅调用的来源——这意味着我必须 await consume(QUEUE)
,对吧?
更新: 添加了一些未经测试的代码草案(如果我没有采取正确的方法,我不想花更多时间让它工作) - 我考虑过将成功和失败回调发送到消费函数,以便它可以报告成功一旦它从队列中获得第一个有效(但可能为空)响应,它就会将队列 url 存储为订阅 - 如果队列轮询失败则取消订阅。
因为我设置了几个队列消费者,所以他们不应该阻塞任何东西而只是在后台工作
let subscribedQueueURLs = []
async function consumeQueue(
url: QueueURL,
success: () => mixed,
failure: (error: Error) => mixed
) {
const sqs = new AWS.SQS()
const params = {
QueueUrl: url,
WaitTimeSeconds: 20,
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const receivedData = await sqs.receiveMessage(params).promise()
if (!subscribedQueueURLs.includes(url)) {
success()
}
// eslint-disable-next-line no-restricted-syntax
for (const message of receivedData.Messages) {
console.log({ message })
// eslint-disable-next-line no-await-in-loop
eventHandler && (await eventHandler.message(message, url))
const deleteParams = {
QueueUrl: url,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
const deleteResult = await sqs.deleteMessage(deleteParams).promise()
console.log({ deleteResult })
}
} while (subscribedQueueURLs.includes(url))
} catch (error) {
failure(error)
}
}
export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
const url = generateQueueURL(entityType, entityId)
consumeQueue(
url,
() => {
subscribedQueueURLs.push(url)
eventHandler && eventHandler.subscribe(url)
},
error => {
console.error(error)
unsubscribe(entityType, entityId)
}
)
}
我最终是这样解决的——虽然这可能不是最优雅的解决方案...
let eventHandler: ?EventHandler
let awsOptions: ?AWSOptions
let subscribedQueueUrls = []
let sqs = null
let sns = null
export function setup(handler: EventHandler) {
eventHandler = handler
}
export async function login(
{ awsKey, awsSecret, awsRegion }: AWSCredentials,
autoReconnect: boolean
) {
const credentials = new AWS.Credentials(awsKey, awsSecret)
AWS.config.update({ region: awsRegion, credentials })
sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
sns = new AWS.SNS({ apiVersion: '2010-03-31' })
const sts = new AWS.STS({ apiVersion: '2011-06-15' })
const { Account } = await sts.getCallerIdentity().promise()
awsOptions = { accountId: Account, region: awsRegion }
eventHandler && eventHandler.login({ awsRegion, awsKey, awsSecret }, autoReconnect)
}
async function handleQueueMessages(messages, queueUrl) {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
// eslint-disable-next-line no-restricted-syntax
for (const message of messages) {
if (!eventHandler) {
return
}
// eslint-disable-next-line no-await-in-loop
await eventHandler.message({
content: message,
queueUrl,
timestamp: new Date().toISOString(),
})
const deleteParams = {
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
await sqs.deleteMessage(deleteParams).promise()
}
}
export async function subscribe(queueUrl: QueueUrl) {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
const initialParams = {
QueueUrl: queueUrl,
WaitTimeSeconds: 0,
MessageAttributeNames: ['All'],
AttributeNames: ['All'],
}
const longPollParams = {
...initialParams,
WaitTimeSeconds: 20,
}
// Attempt to consume the queue, and handle any pending messages.
const firstResponse = await sqs.receiveMessage(initialParams).promise()
if (!subscribedQueueUrls.includes(queueUrl)) {
subscribedQueueUrls.push(queueUrl)
eventHandler && eventHandler.subscribe(queueUrl)
}
handleQueueMessages(firstResponse.Messages, queueUrl)
// Keep on polling the queue afterwards.
setImmediate(async () => {
if (!sqs) {
throw new Error(
'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
)
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const received = await sqs.receiveMessage(longPollParams).promise()
handleQueueMessages(received.Messages, queueUrl)
} while (sqs && subscribedQueueUrls.includes(queueUrl))
} catch (error) {
eventHandler && eventHandler.disconnect()
throw error
}
})
}