nestjs微服务中的动态kafka主题名称
Dynamic kafka topic name in nestjs microservice
在 Nestjs 中,我使用 kafka 作为消息代理并设置主题名称如下:
@MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
// my code goes here
}
有没有办法从配置服务模块读取kafka主题名称?
我通过创建一个新的自定义装饰器来处理这个问题。
export function KafkaTopic(variable: string | keyof AppConfig): any {
return (
target: any,
key: string | symbol,
descriptor: PropertyDescriptor,
) => {
Reflect.defineMetadata(
KAFKA_TOPIC_METADATA,
variable,
descriptor.value,
);
return descriptor;
};
然后用 MessagePattern 动态替换它,并从 appConfig 设置主题名称:
@Injectable()
export class KafkaDecoratorProcessorService {
constructor(
private readonly LOG: Logger,
private readonly appConfig: AppConfig,
) {
}
processKafkaDecorators(types: any[]) {
for (const type of types) {
const propNames = Object.getOwnPropertyNames(type.prototype);
for (const prop of propNames) {
const propValue = Reflect.getMetadata(
KAFKA_TOPIC_METADATA,
Reflect.get(type.prototype, prop),
);
if (propValue) {
const topic = this.appConfig[propValue];
this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
Reflect.decorate(
[MessagePattern(topic)],
type.prototype,
prop,
Reflect.getOwnPropertyDescriptor(type.prototype, prop),
);
}
}
}
}
}
这是 运行 在 main.ts 文件中处理 KafkaDecorators 的方法:
const app = await NestFactory.create(AppModule);
app
.get(KafkaDecoratorProcessorService)
.processKafkaDecorators([AppController]);
app.connectMicroservice({
transport: Transport.KAFKA,
...
})
请注意,在连接微服务之前,您必须 运行。
并像这样使用它:
@KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
async processMessage(
@Payload() { value: payload }: { value: BookUpdateModel },
) {
...
}
您可以使用 process.env.VAR_NAME
,像这样:
@MessagePattern(process.env.MESSAGES_TOPIC)
需要注意的一件重要事情是 .env
文件将不起作用,您需要在应用程序启动之前设置一个环境变量,这是因为 ConfigService/dotenv 加载太晚了案件。
你可以用这个来实现 packag.json
:
"scripts": {
"start": "export MESSAGES_TOPIC=topic_name || SET \"MESSAGES_TOPIC=topic_name \" && nest start",
},
尽管 Ali 的回答可能有效,但我认为对于这么简单的事情来说太过分了,在我看来最好不要对这种特定情况使用 ConfigService
。
在 Nestjs 中,我使用 kafka 作为消息代理并设置主题名称如下:
@MessagePattern('topic-name')
async getNewRequest(@Payload() message: any): Promise<void> {
// my code goes here
}
有没有办法从配置服务模块读取kafka主题名称?
我通过创建一个新的自定义装饰器来处理这个问题。
export function KafkaTopic(variable: string | keyof AppConfig): any {
return (
target: any,
key: string | symbol,
descriptor: PropertyDescriptor,
) => {
Reflect.defineMetadata(
KAFKA_TOPIC_METADATA,
variable,
descriptor.value,
);
return descriptor;
};
然后用 MessagePattern 动态替换它,并从 appConfig 设置主题名称:
@Injectable()
export class KafkaDecoratorProcessorService {
constructor(
private readonly LOG: Logger,
private readonly appConfig: AppConfig,
) {
}
processKafkaDecorators(types: any[]) {
for (const type of types) {
const propNames = Object.getOwnPropertyNames(type.prototype);
for (const prop of propNames) {
const propValue = Reflect.getMetadata(
KAFKA_TOPIC_METADATA,
Reflect.get(type.prototype, prop),
);
if (propValue) {
const topic = this.appConfig[propValue];
this.LOG.log(`Setting topic ${topic} for ${type.name}#${prop}`);
Reflect.decorate(
[MessagePattern(topic)],
type.prototype,
prop,
Reflect.getOwnPropertyDescriptor(type.prototype, prop),
);
}
}
}
}
}
这是 运行 在 main.ts 文件中处理 KafkaDecorators 的方法:
const app = await NestFactory.create(AppModule);
app
.get(KafkaDecoratorProcessorService)
.processKafkaDecorators([AppController]);
app.connectMicroservice({
transport: Transport.KAFKA,
...
})
请注意,在连接微服务之前,您必须 运行。 并像这样使用它:
@KafkaTopic('KAFKA_TOPIC_BOOK_UPDATE')
async processMessage(
@Payload() { value: payload }: { value: BookUpdateModel },
) {
...
}
您可以使用 process.env.VAR_NAME
,像这样:
@MessagePattern(process.env.MESSAGES_TOPIC)
需要注意的一件重要事情是 .env
文件将不起作用,您需要在应用程序启动之前设置一个环境变量,这是因为 ConfigService/dotenv 加载太晚了案件。
你可以用这个来实现 packag.json
:
"scripts": {
"start": "export MESSAGES_TOPIC=topic_name || SET \"MESSAGES_TOPIC=topic_name \" && nest start",
},
尽管 Ali 的回答可能有效,但我认为对于这么简单的事情来说太过分了,在我看来最好不要对这种特定情况使用 ConfigService
。