在本地主机上连接两个 NestJS gRPC 微服务
Connecting two NestJS gRPC microservices on localhost
简介
我对 NestJS 还很陌生,但我真的很喜欢它到目前为止的强大功能,所以我想看看结合使用 Kafka、gRPC 和 NestJS 能走多远。
我的最终目标是具有以下设计,因为我想复制我的 Kafka Producers:
gRPC client <---> gRPC server and Kafka client <----> Kafka
卡夫卡
我有一个在 Kubernetes 中构建的 Kafka 集群,我可以通过 NestJS 的 L4 负载均衡器轻松访问它。
NestJS 中的 Kafka 端没问题。我什至依赖 kafkajs
并简单地使用 kafkajs.Producer
和 kafkajs.Consumer
类 构建消费者和生产者,相应地定义 Kafka
实例的配置 ConfigService
.
gRPC
我正在努力让 gRPC 服务器代表 gRPC 客户端将请求转发给 Kafka 生产者。
问题
我可以启动 gRPC 服务器,但不能启动 gRPC 客户端。客户端 returns 来自 @grpc/grpc-js/src/server.ts:569
的以下错误:
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [NestFactory] Starting Nest application...
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] AppModule dependencies initialized +17ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] ClientsModule dependencies initialized +0ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] GrpcClientModule dependencies initialized +0ms
E No address added out of total 2 resolved
/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569
deferredCallback(new Error(errorString), 0);
^
Error: No address added out of total 2 resolved
at bindResultPromise.then.errorString (/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569:32)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
我不确定为什么会发生这种情况,但我怀疑它与 gRPC 通道有关,或者换句话说,服务器和客户端之间的通信 link。但是,我找不到关于此事的任何文件。将不胜感激。
如果您跟踪堆栈跟踪,您应该注意到 @grpc/grpc-js/src/server.ts:569:32
中的以下内容,换句话说,此错误在 @grpc/grpcjs 库中:
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) {
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
return;
}
let bindResultPromise: Promise<BindResult>;
if (isTcpSubchannelAddress(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
}
} else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(
(bindResult) => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
deferredCallback(null, bindResult.port);
}
},
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
}
);
},
onError: (error) => {
deferredCallback(new Error(error.details), 0);
},
};
我的代码
因为 gRPC 需要共享 .proto
文件,所以我使用以下内容:
syntax = "proto3";
package KAFKA_GRPC_SERVICE;
service KafkaGrpcService {
rpc Produce(ProducerRequest) returns(Empty) {}
}
// See `kafkajs/types/ProducerRecord`
message ProducerRequest {
// See `kafkajs/types/Message`
message Message {
required string value = 1;
optional string key = 2;
optional int32 partition = 3;
optional string timestamp = 4;
}
required string topic = 1;
repeated Message messages = 2;
optional int32 acks = 3;
optional int32 timeout = 4;
}
message Empty {}
然后我自动生成所需的 interface
,这给了我:
/**
* This file is auto-generated by nestjs-proto-gen-ts
*/
import { Observable } from 'rxjs';
import { Metadata } from '@grpc/grpc-js';
export namespace KAFKA_GRPC_SERVICE {
export interface KafkaGrpcService {
produce(data: ProducerRequest, metadata?: Metadata): Observable<Empty>;
}
// See `kafkajs/types/ProducerRecord`
export interface ProducerRequest {
topic: string;
messages: ProducerRequest.Message[];
acks?: number;
timeout?: number;
}
export namespace ProducerRequest {
// See `kafkajs/types/ProducerRecord`
// See `kafkajs/types/Message`
export interface Message {
value: string;
key?: string;
partition?: number;
timestamp?: string;
}
}
// tslint:disable-next-line:no-empty-interface
export interface Empty {
}
}
请注意,我已经调整了一些 interface
,因为它提供了某些可选元素,当我需要其中一些元素以与 kafkajs/ProducerRecord
兼容时。
我的存储库有两个不同的 NestJS 应用程序,一个叫 kafka-grpc-server
,另一个叫 kafka-grpc-client
,所以代码略有不同。为了简洁起见,我不会在此处 post ProducerService
。
服务器
gRPC 服务的 Myconfig 在 grpc.options.ts
文件中定义,如下所示:
import { Transport } from "@nestjs/microservices";
import { join } from "path";
export const grpcOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
我的 Controller
在服务器端看起来像这样:
import { Controller,Logger } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { KAFKA_GRPC_SERVICE } from './grpc/interfaces/kafkagrpcservice';
import { ProducerService } from "./kafka/producer/producer.service";
@Controller()
export class KafkaGrpcController implements KAFKA_GRPC_SERVICE.KafkaGrpcService {
private logger = new Logger(KafkaGrpcController.name)
constructor(private readonly kafkaProducer: ProducerService) {}
@GrpcMethod('KafkaGrpcService', 'Produce')
produce(data: KAFKA_GRPC_SERVICE.ProducerRequest, metadata: any): Observable<any> {
this.logger.log('Producing message: {} with metadata: {}', data, metadata.toString());
this.kafkaProducer.produce(data);
return;
}
}
我的 main.ts
服务器端是这样的:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.listen();
logger.log(`Microservice is listening on '${grpcOptions.options.url}'`);
}
bootstrap();
我可以毫无问题地成功启动服务器,尽管我注意到有一个进程有两个文件描述符在 :5000
和 TCP
上侦听。我不确定这是否是标准的。我的第一个问题是为什么有两个服务,而当我只创建一个微服务时却没有一个?这是 NestJS 的东西吗?
node 50355 mattia 27u IPv6 * 0t0 TCP localhost:commplex-main (LISTEN)
node 50355 mattia 29u IPv4 * 0t0 TCP localhost:commplex-main (LISTEN)
客户端
客户端略有不同。我使用相同的 .proto
文件。但是,grpc.options.ts
文件略有不同,因为它需要是一个客户端:
export const grpcOptions: ClientOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
如您所见,ClientOptions
用于客户端,但不用于服务器。
客户端,我有一个 GrpcClientModule
看起来像这样:
import { Module } from "@nestjs/common";
import { ClientsModule } from "@nestjs/microservices";
import { grpcOptions } from "../grpc.options";
import { GrpcClientController } from "./grpcclient.controller";
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_GRPC_SERVICE',
...grpcOptions,
}
])
],
controllers: [
GrpcClientController,
],
})
export class GrpcClientModule {}
而 GrpClientController
是这样的:
import { Metadata } from "@grpc/grpc-js";
import { Body, Controller, Get, Inject, OnModuleInit, Post } from "@nestjs/common";
import { Client, ClientGrpc } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { grpcOptions } from "src/grpc.options";
import { KAFKA_GRPC_SERVICE } from "./interfaces/kafkagrpcservice";
@Controller()
export class GrpcClientController implements OnModuleInit {
constructor(@Inject('KAFKA_GRPC_SERVICE') private client: ClientGrpc) {}
private grpcService: KAFKA_GRPC_SERVICE.KafkaGrpcService;
onModuleInit(): void {
this.grpcService = this.client.getService<KAFKA_GRPC_SERVICE.KafkaGrpcService>('KafkaRpcService')
}
@Post('produce')
async produce(data: KAFKA_GRPC_SERVICE.ProducerRequest): Promise<Observable<KAFKA_GRPC_SERVICE.ProducerResponse>> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'my_cookie=in_milk');
return this.grpcService.produce( { topic: data.topic, messages: data.messages }, metadata );
}
}
我按如下方式启动我的客户端:
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, grpcOptions);
await app.listen();
}
bootstrap();
任何帮助都会很棒!
所以,我找到了答案。似乎不可能将单个微服务启动为 gRPC 服务器。确实,每当我将微服务变成混合应用程序时,如下所示:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.create(KafkaGrpcServerModule);
const grpcServer = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
我可以连接客户端。
简介
我对 NestJS 还很陌生,但我真的很喜欢它到目前为止的强大功能,所以我想看看结合使用 Kafka、gRPC 和 NestJS 能走多远。
我的最终目标是具有以下设计,因为我想复制我的 Kafka Producers:
gRPC client <---> gRPC server and Kafka client <----> Kafka
卡夫卡
我有一个在 Kubernetes 中构建的 Kafka 集群,我可以通过 NestJS 的 L4 负载均衡器轻松访问它。
NestJS 中的 Kafka 端没问题。我什至依赖 kafkajs
并简单地使用 kafkajs.Producer
和 kafkajs.Consumer
类 构建消费者和生产者,相应地定义 Kafka
实例的配置 ConfigService
.
gRPC
我正在努力让 gRPC 服务器代表 gRPC 客户端将请求转发给 Kafka 生产者。
问题
我可以启动 gRPC 服务器,但不能启动 gRPC 客户端。客户端 returns 来自 @grpc/grpc-js/src/server.ts:569
的以下错误:
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [NestFactory] Starting Nest application...
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] AppModule dependencies initialized +17ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] ClientsModule dependencies initialized +0ms
[Nest] 50525 - 11/03/2022, 16:09:13 LOG [InstanceLoader] GrpcClientModule dependencies initialized +0ms
E No address added out of total 2 resolved
/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569
deferredCallback(new Error(errorString), 0);
^
Error: No address added out of total 2 resolved
at bindResultPromise.then.errorString (/Users/mattia/github/microservices/kafka-grpc-client/node_modules/@grpc/grpc-js/src/server.ts:569:32)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
我不确定为什么会发生这种情况,但我怀疑它与 gRPC 通道有关,或者换句话说,服务器和客户端之间的通信 link。但是,我找不到关于此事的任何文件。将不胜感激。
如果您跟踪堆栈跟踪,您应该注意到 @grpc/grpc-js/src/server.ts:569:32
中的以下内容,换句话说,此错误在 @grpc/grpcjs 库中:
const resolverListener: ResolverListener = {
onSuccessfulResolution: (
addressList,
serviceConfig,
serviceConfigError
) => {
// We only want one resolution result. Discard all future results
resolverListener.onSuccessfulResolution = () => {};
if (addressList.length === 0) {
deferredCallback(new Error(`No addresses resolved for port ${port}`), 0);
return;
}
let bindResultPromise: Promise<BindResult>;
if (isTcpSubchannelAddress(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(
addressList,
addressList[0].port,
0
);
}
} else {
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(
(bindResult) => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
logging.log(
LogVerbosity.INFO,
`WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`
);
}
deferredCallback(null, bindResult.port);
}
},
(error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
logging.log(LogVerbosity.ERROR, errorString);
deferredCallback(new Error(errorString), 0);
}
);
},
onError: (error) => {
deferredCallback(new Error(error.details), 0);
},
};
我的代码
因为 gRPC 需要共享 .proto
文件,所以我使用以下内容:
syntax = "proto3";
package KAFKA_GRPC_SERVICE;
service KafkaGrpcService {
rpc Produce(ProducerRequest) returns(Empty) {}
}
// See `kafkajs/types/ProducerRecord`
message ProducerRequest {
// See `kafkajs/types/Message`
message Message {
required string value = 1;
optional string key = 2;
optional int32 partition = 3;
optional string timestamp = 4;
}
required string topic = 1;
repeated Message messages = 2;
optional int32 acks = 3;
optional int32 timeout = 4;
}
message Empty {}
然后我自动生成所需的 interface
,这给了我:
/**
* This file is auto-generated by nestjs-proto-gen-ts
*/
import { Observable } from 'rxjs';
import { Metadata } from '@grpc/grpc-js';
export namespace KAFKA_GRPC_SERVICE {
export interface KafkaGrpcService {
produce(data: ProducerRequest, metadata?: Metadata): Observable<Empty>;
}
// See `kafkajs/types/ProducerRecord`
export interface ProducerRequest {
topic: string;
messages: ProducerRequest.Message[];
acks?: number;
timeout?: number;
}
export namespace ProducerRequest {
// See `kafkajs/types/ProducerRecord`
// See `kafkajs/types/Message`
export interface Message {
value: string;
key?: string;
partition?: number;
timestamp?: string;
}
}
// tslint:disable-next-line:no-empty-interface
export interface Empty {
}
}
请注意,我已经调整了一些 interface
,因为它提供了某些可选元素,当我需要其中一些元素以与 kafkajs/ProducerRecord
兼容时。
我的存储库有两个不同的 NestJS 应用程序,一个叫 kafka-grpc-server
,另一个叫 kafka-grpc-client
,所以代码略有不同。为了简洁起见,我不会在此处 post ProducerService
。
服务器
gRPC 服务的 Myconfig 在 grpc.options.ts
文件中定义,如下所示:
import { Transport } from "@nestjs/microservices";
import { join } from "path";
export const grpcOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
我的 Controller
在服务器端看起来像这样:
import { Controller,Logger } from "@nestjs/common";
import { GrpcMethod } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { KAFKA_GRPC_SERVICE } from './grpc/interfaces/kafkagrpcservice';
import { ProducerService } from "./kafka/producer/producer.service";
@Controller()
export class KafkaGrpcController implements KAFKA_GRPC_SERVICE.KafkaGrpcService {
private logger = new Logger(KafkaGrpcController.name)
constructor(private readonly kafkaProducer: ProducerService) {}
@GrpcMethod('KafkaGrpcService', 'Produce')
produce(data: KAFKA_GRPC_SERVICE.ProducerRequest, metadata: any): Observable<any> {
this.logger.log('Producing message: {} with metadata: {}', data, metadata.toString());
this.kafkaProducer.produce(data);
return;
}
}
我的 main.ts
服务器端是这样的:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.listen();
logger.log(`Microservice is listening on '${grpcOptions.options.url}'`);
}
bootstrap();
我可以毫无问题地成功启动服务器,尽管我注意到有一个进程有两个文件描述符在 :5000
和 TCP
上侦听。我不确定这是否是标准的。我的第一个问题是为什么有两个服务,而当我只创建一个微服务时却没有一个?这是 NestJS 的东西吗?
node 50355 mattia 27u IPv6 * 0t0 TCP localhost:commplex-main (LISTEN)
node 50355 mattia 29u IPv4 * 0t0 TCP localhost:commplex-main (LISTEN)
客户端
客户端略有不同。我使用相同的 .proto
文件。但是,grpc.options.ts
文件略有不同,因为它需要是一个客户端:
export const grpcOptions: ClientOptions = {
transport: Transport.GRPC,
options: {
package: 'KAFKA_GRPC_SERVICE',
url: 'localhost:5000',
protoPath: join(__dirname, 'grpc/proto/kafkagrpcservice.proto'),
},
}
如您所见,ClientOptions
用于客户端,但不用于服务器。
客户端,我有一个 GrpcClientModule
看起来像这样:
import { Module } from "@nestjs/common";
import { ClientsModule } from "@nestjs/microservices";
import { grpcOptions } from "../grpc.options";
import { GrpcClientController } from "./grpcclient.controller";
@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_GRPC_SERVICE',
...grpcOptions,
}
])
],
controllers: [
GrpcClientController,
],
})
export class GrpcClientModule {}
而 GrpClientController
是这样的:
import { Metadata } from "@grpc/grpc-js";
import { Body, Controller, Get, Inject, OnModuleInit, Post } from "@nestjs/common";
import { Client, ClientGrpc } from "@nestjs/microservices";
import { Observable } from "rxjs";
import { grpcOptions } from "src/grpc.options";
import { KAFKA_GRPC_SERVICE } from "./interfaces/kafkagrpcservice";
@Controller()
export class GrpcClientController implements OnModuleInit {
constructor(@Inject('KAFKA_GRPC_SERVICE') private client: ClientGrpc) {}
private grpcService: KAFKA_GRPC_SERVICE.KafkaGrpcService;
onModuleInit(): void {
this.grpcService = this.client.getService<KAFKA_GRPC_SERVICE.KafkaGrpcService>('KafkaRpcService')
}
@Post('produce')
async produce(data: KAFKA_GRPC_SERVICE.ProducerRequest): Promise<Observable<KAFKA_GRPC_SERVICE.ProducerResponse>> {
const metadata = new Metadata();
metadata.add('Set-Cookie', 'my_cookie=in_milk');
return this.grpcService.produce( { topic: data.topic, messages: data.messages }, metadata );
}
}
我按如下方式启动我的客户端:
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, grpcOptions);
await app.listen();
}
bootstrap();
任何帮助都会很棒!
所以,我找到了答案。似乎不可能将单个微服务启动为 gRPC 服务器。确实,每当我将微服务变成混合应用程序时,如下所示:
import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { grpcOptions } from './grpc.options';
import { KafkaGrpcServerModule } from './kafkagrpcserver.module';
async function bootstrap() {
const logger = new Logger('Main')
const app = await NestFactory.create(KafkaGrpcServerModule);
const grpcServer = await NestFactory.createMicroservice(KafkaGrpcServerModule, grpcOptions);
await app.startAllMicroservices();
await app.listen(3000);
}
bootstrap();
我可以连接客户端。