一个一个地完成动态数组观察(concatMap)

complete dynamic array observables one by one (concatMap)

我正在制作一个聊天应用程序,我想以正确的顺序保存消息。

想象一下,我会有一个静态数量的消息

// 4 messages. array of static length: 4
chatMessages: string[] = ['hello', 'world', 'and', 'stack overflow members']; // 

现在,让我们为它们创建一个 save observables。

chatMessages: Observable<ChatMessage>[] = chatMessages.map((message: string) => {
    return chatService.saveMessage(message); // returns an Observable to call API
})

现在,我要一个一个的保存,一个一个的保存。

我是这样做的:

from(chatMessages).pipe(
    concatMap((observable) => observable),
     toArray(),
     take(1)
).subscribe();

现在我的问题是,如果初始的 chatMessages 数组是动态的——(可以在任何时间点添加消息,甚至在保存期间)。 如何循环数组以一条一条地保存聊天消息,并保持它们添加的顺序?

例如:保存了四条消息中的两条,正在处理第三条,此时第五条消息被添加到 chatMessages 数组。我该如何管理?

如果我理解正确的话,你必须处理一个数组,它可以随着时间的推移接收额外的元素,并且必须在数组的末尾添加这些元素。

这可以看作是消息流,最初存储在数组中的消息是流的第一个元素,随着时间的推移可以向其中添加其他消息,例如调用特定函数 addMessage(msg).

构建此类流的代码可能如下所示

const myInitialArray = ['hello', 'world', 'and', 'stack overflow members']
function saveMessage(msg: string) {
  return of(`saved: ${msg}`).pipe(delay(1000))
}

const add$ = new Subject<string>()

const myStream = merge(from(myInitialArray), add$).pipe(
  concatMap(msg => saveMessage(msg))
)

现在你要做的是 subscribemyStream 并且,任何时候你想在数组(或流)的末尾添加一条消息,你必须调用函数 addMessage.

This stackblitz 显示示例有效。

If the initial chatMessages array is dynamic - (can be added a message in any point of time, even during saving)

您描述的是消息(字符串)的 Observable,而不是数组!由于您一次处理一个项目,因此不需要数组。

您可以只使用一个简单的主题,它在收到消息时发出消息,并订阅该流来为您保存消息:

chatMessage$ = new Subject<string>();

function saveMessage(message: string) {
    chatMessage$.next(message);
}

chatMessage$.pipe(
    concatMap(message => chatService.saveMessage(message))
).subscribe();

这将以正确的顺序一次处理一条新消息。