在本地主机上连接两个 NestJS gRPC 微服务

Connecting two NestJS gRPC microservices on localhost

简介

我对 NestJS 还很陌生,但我真的很喜欢它到目前为止的强大功能,所以我想看看结合使用 Kafka、gRPC 和 NestJS 能走多远。

我的最终目标是具有以下设计,因为我想复制我的 Kafka Producers:

gRPC client <---> gRPC server and Kafka client <----> Kafka

卡夫卡

我有一个在 Kubernetes 中构建的 Kafka 集群,我可以通过 NestJS 的 L4 负载均衡器轻松访问它。

NestJS 中的 Kafka 端没问题。我什至依赖 kafkajs 并简单地使用 kafkajs.Producerkafkajs.Consumer 类 构建消费者和生产者,相应地定义 Kafka 实例的配置 ConfigService.

gRPC

我正在努力让 gRPC 服务器代表 gRPC 客户端将请求转发给 Kafka 生产者。

问题

我可以启动 gRPC 服务器,但不能启动 gRPC 客户端。客户端 returns 来自 @grpc/grpc-js/src/server.ts:569 的以下错误:

[Nest] 50525  - 11/03/2022, 16:09:13     LOG [NestFactory] Starting Nest application...
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] AppModule dependencies initialized +17ms
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] ClientsModule dependencies initialized +0ms
[Nest] 50525  - 11/03/2022, 16:09:13     LOG [InstanceLoader] GrpcClientModule dependencies initialized +0ms
E No address added out of total 2 resolved

/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569
              deferredCallback(new Error(errorString), 0);
                               ^
Error: No address added out of total 2 resolved
    at bindResultPromise.then.errorString (/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569:32)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

我不确定为什么会发生这种情况,但我怀疑它与 gRPC 通道有关,或者换句话说,服务器和客户端之间的通信 link。但是,我找不到关于此事的任何文件。将不胜感激。

如果您跟踪堆栈跟踪,您应该注意到 @grpc/grpc-js/src/server.ts:569:32 中的以下内容,换句话说,此错误在 @grpc/grpcjs 库中:

const resolverListener: ResolverListener = {
      onSuccessfulResolution: (
        addressList,
        serviceConfig,
        serviceConfigError
      ) => {
        // We only want one resolution result. Discard all future results 
        resolverListener.onSuccessfulResolution = () => {};
        if (addressList.length === 0) {
          deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
          return;
        }
        let bindResultPromise: Promise<BindResult>;
        if (isTcpSubchannelAddress(addressList[0])) {
          if (addressList[0].port === 0) {
            bindResultPromise = bindWildcardPort(addressList);
          } else {
            bindResultPromise = bindSpecificPort(
              addressList,
              addressList[0].port,
              0
            );
          }
        } else {
          // Use an arbitrary non-zero port for non-TCP addresses
          bindResultPromise = bindSpecificPort(addressList, 1, 0);
        }
        bindResultPromise.then(
          (bindResult) => {
            if (bindResult.count === 0) {
              const errorString = `No address added out of total ${addressList.length} resolved`;
              logging.log(LogVerbosity.ERROR, errorString);
              deferredCallback(new Error(errorString), 0);
            } else {
              if (bindResult.count < addressList.length) {
                logging.log(
                  LogVerbosity.INFO,
                  `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
                );
              }
              deferredCallback(null, bindResult.port);
            }
          },
          (error) => {
            const errorString = `No address added out of total ${addressList.length} resolved`;
            logging.log(LogVerbosity.ERROR, errorString);
            deferredCallback(new Error(errorString), 0);
          }
        );
      },
      onError: (error) => {
        deferredCallback(new Error(error.details), 0);
      },
    };

我的代码

因为 gRPC 需要共享 .proto 文件,所以我使用以下内容:

syntax = "proto3";

package KAFKA_GRPC_SERVICE;

service KafkaGrpcService {
    rpc Produce(ProducerRequest) returns(Empty) {}
}

// See `kafkajs/types/ProducerRecord`
message ProducerRequest {
    
    // See `kafkajs/types/Message`
    message Message {
        required string value = 1;
        optional string key = 2;
        optional int32 partition = 3;
        optional string timestamp = 4;
    }

    required string topic = 1;
    repeated Message messages = 2;
    optional int32 acks = 3;
    optional int32 timeout = 4;
}

message Empty {}

然后我自动生成所需的 interface,这给了我:

/**
* This file is auto-generated by nestjs-proto-gen-ts
*/

import { Observable } from 'rxjs';
import { Metadata } from '@grpc/grpc-js';

export namespace KAFKA_GRPC_SERVICE {
    export interface KafkaGrpcService {
        produce(data: ProducerRequest, metadata?: Metadata): Observable<Empty>;
    }
    // See &#x60;kafkajs/types/ProducerRecord&#x60;
    export interface ProducerRequest {
        topic: string;
        messages: ProducerRequest.Message[];
        acks?: number;
        timeout?: number;
    }
    export namespace ProducerRequest {
        // See &#x60;kafkajs/types/ProducerRecord&#x60;
        // See &#x60;kafkajs/types/Message&#x60;
        export interface Message {
            value: string;
            key?: string;
            partition?: number;
            timestamp?: string;
        }
    }
    // tslint:disable-next-line:no-empty-interface
    export interface Empty {
    }
}

请注意,我已经调整了一些 interface,因为它提供了某些可选元素,当我需要其中一些元素以与 kafkajs/ProducerRecord 兼容时。

我的存储库有两个不同的 NestJS 应用程序,一个叫 kafka-grpc-server,另一个叫 kafka-grpc-client,所以代码略有不同。为了简洁起见,我不会在此处 post ProducerService

服务器

gRPC 服务的 Myconfig 在 grpc.options.ts 文件中定义,如下所示:

import { Transport } from "@nestjs/microservices";
import { join } from "path";

export const grpcOptions = {
    transport: Transport.GRPC,
    options: {
        package: 'KAFKA_GRPC_SERVICE',
        url: 'localhost:5000',
        protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
    },
}

我的 Controller 在服务器端看起来像这样:

import { Controller,Logger } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { KAFKA_GRPC_SERVICE } from './grpc/interfaces/kafkagrpcservice';
import { ProducerService } from "./kafka/producer/producer.service";

@Controller()
export class KafkaGrpcController implements KAFKA_GRPC_SERVICE.KafkaGrpcService {
    private logger = new Logger(KafkaGrpcController.name)
    constructor(private readonly kafkaProducer: ProducerService) {}

    @GrpcMethod('KafkaGrpcService', 'Produce')
    produce(data: KAFKA_GRPC_SERVICE.ProducerRequest, metadata: any): Observable<any> {
        this.logger.log('Producing message: {} with metadata: {}', data, metadata.toString());
        this.kafkaProducer.produce(data);
        return;
    }
}

我的 main.ts 服务器端是这样的:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';

async function bootstrap() {
  const logger = new Logger('Main')
  const app = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
  await app.listen();
  logger.log(`Microservice is listening on '${grpcOptions.options.url}'`);

}
bootstrap();

我可以毫无问题地成功启动服务器,尽管我注意到有一个进程有两个文件描述符在 :5000TCP 上侦听。我不确定这是否是标准的。我的第一个问题是为什么有两个服务,而当我只创建一个微服务时却没有一个?这是 NestJS 的东西吗?

node      50355 mattia   27u  IPv6 *      0t0  TCP localhost:commplex-main (LISTEN)
node      50355 mattia   29u  IPv4 *      0t0  TCP localhost:commplex-main (LISTEN)

客户端

客户端略有不同。我使用相同的 .proto 文件。但是,grpc.options.ts 文件略有不同,因为它需要是一个客户端:

export const grpcOptions: ClientOptions = {
    transport: Transport.GRPC,
    options: {
        package: 'KAFKA_GRPC_SERVICE',
        url: 'localhost:5000',
        protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
    },
}

如您所见,ClientOptions 用于客户端,但不用于服务器。

客户端,我有一个 GrpcClientModule 看起来像这样:

import { Module } from "@nestjs/common";
import { ClientsModule } from "@nestjs/microservices";
import { grpcOptions } from "../grpc.options";
import { GrpcClientController } from "./grpcclient.controller";

@Module({
    imports: [
        ClientsModule.register([
            {
                name: 'KAFKA_GRPC_SERVICE',
                ...grpcOptions,
            }
        ])
    ],
    controllers: [
        GrpcClientController,
    ],
})
export class GrpcClientModule {}

GrpClientController 是这样的:

import { Metadata } from "@grpc/grpc-js";
import { Body, Controller, Get, Inject, OnModuleInit, Post } from "@nestjs/common";
import { Client, ClientGrpc } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { grpcOptions } from "src/grpc.options";
import { KAFKA_GRPC_SERVICE } from "./interfaces/kafkagrpcservice";

@Controller()
export class GrpcClientController implements OnModuleInit {
    constructor(@Inject('KAFKA_GRPC_SERVICE') private client: ClientGrpc) {}
    private grpcService: KAFKA_GRPC_SERVICE.KafkaGrpcService;

    onModuleInit(): void {
        this.grpcService = this.client.getService<KAFKA_GRPC_SERVICE.KafkaGrpcService>('KafkaRpcService')
    }

    @Post('produce')
    async produce(data: KAFKA_GRPC_SERVICE.ProducerRequest): Promise<Observable<KAFKA_GRPC_SERVICE.ProducerResponse>> {
        const metadata = new Metadata();
        metadata.add('Set-Cookie', 'my_cookie=in_milk');
        return this.grpcService.produce( { topic: data.topic, messages: data.messages }, metadata );
    }
}

我按如下方式启动我的客户端:

import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice(AppModule, grpcOptions);
  await app.listen();
}
bootstrap();

任何帮助都会很棒!

所以,我找到了答案。似乎不可能将单个微服务启动为 gRPC 服务器。确实,每当我将微服务变成混合应用程序时,如下所示:

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';

async function bootstrap() {
  const logger = new Logger('Main')
  const app = await NestFactory.create(KafkaGrpcServerModule);
  const grpcServer = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
  await app.startAllMicroservices();
  await app.listen(3000);

}
bootstrap();

我可以连接客户端。