kafka 上的 NestJs cqrs 异常和错误处理

NestJs cqrs exception and error handling on kafka

我有点问题,

我在 NestJS 中使用 Kafka 和 CQRS

我的问题是:在写入过程中处理错误的最佳方式是什么?

  1. 我将请求发送到 Web api 到我的 nestjs 应用程序

  2. 我有事件 ObjectedCreatedEvent 创建后将其发送到事件总线上并将其写入 kafka (confluent.cloud)。

  3. Kafka 响应错误 87,因为消息验证针对 json 模式失败(到目前为止没问题)

  4. 我如何正确响应网络 api 发生错误?

刚刚启动了另一个侦听器?我希望 kafka(融合云)至少必须针对此类事情进行讨论。

我只需要一个事件 ObjectValdiationFailedEvent 并将其放到事件总线上。

Confluent Cloud 在这里对您帮助不大,因为这是一个客户端问题。如您所知; Kafka 接收生产者序列化的任何内容,并将数据简单地存储到选定的分区中。在您的情况下,data 不是离开客户端的事件,这意味着抛出此 error 87 的任何东西肯定不是服务器端 Kafka,而是客户端-side.

我的建议是设置您的框架 NestJS 支持的任何异常处理程序。我不是 Node.js 开发人员(我的背景是 Java 和 Go),但快速查看 NestJS 文档表明该框架允许您注册能够处理异常的过滤器。例如:

import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';

@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
  catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
    return throwError(exception.getError());
  }
}

更多信息here

所以您可能想调查是哪个层引发了这个 错误 87,以便您可以相应地处理它。