nest bull 队列的单独进程和 api
nest bull separate process for queues and api
我有一个 nestjs
应用程序公开了一些 REST API
。 API
之一触发了一个处理某些任务的作业。问题是,当作业被触发时,应用程序停止为 REST 请求提供服务,这会导致负载均衡器的健康检查失败。我按照README末尾给出的方法启动了一个单独的子进程来处理作业。但是,作业不会在子进程中启动,并且 API
请求停止。
这是我的工作:
import {
BullQueueEvents,
OnQueueActive,
OnQueueEvent,
Process,
Processor,
} from 'nest-bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { AService } from './a-service';
import { AJobInterface } from '../AJobInterface';
@Processor({ name: 'a_queue' })
export class AJob {
private readonly logger = new Logger('AQueue');
constructor(private readonly service: AService) {}
@Process({
name: 'app',
concurrency: 1
})
processApp(job: Job<AJobInterface>) {
console.log('CHILD: ', process.pid);
const { jobId } = job.data;
return this.service.process(jobId);
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(
job.data,
)}...`,
);
}
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
this.logger.log(
`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
);
}
}
这是我的 app.module.ts:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { DatabaseModule } from './db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './worker/a-job';
import { AService } from './worker/a-service';
import { join } from 'path';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ join(__dirname, 'worker/a-job.js') ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
controllers: [AppController],
providers: [AppService, AJob, AService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
我做错了什么吗?
抱歉这么晚才发布答案。事实证明,在子进程中设置 worker 是不可能的。我最终有一个单独的 worker.module.ts
和一个单独的 worker.ts
并为 API 和 worker.
创建了两个单独的进程
worker.module.ts
:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppService } from '../app.service';
import { DatabaseModule } from '../db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './a-job';
import { AService } from './a-service';
import { join } from 'path';
import { Job, DoneCallback } from 'bull';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ (job: Job, done: DoneCallback) => { done(null, job.data); } ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
password: process.env.REDIS_PWD,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
providers: [AppService, AJob, AService],
})
export class WorkerModule implements OnModuleInit {
onModuleInit() {
console.log('WORKER: ', process.pid);
}
}
worker.ts
:
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker/worker.module';
async function bootstrap() {
const app = await NestFactory.create(WorkerModule);
app.init();
}
bootstrap();
虽然 app.module.ts
现在看起来像这样:
//...imports
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
和对应的app.ts
:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { port } from './config';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableCors();
await app.listen(port);
}
bootstrap();
我有一个 nestjs
应用程序公开了一些 REST API
。 API
之一触发了一个处理某些任务的作业。问题是,当作业被触发时,应用程序停止为 REST 请求提供服务,这会导致负载均衡器的健康检查失败。我按照README末尾给出的方法启动了一个单独的子进程来处理作业。但是,作业不会在子进程中启动,并且 API
请求停止。
这是我的工作:
import {
BullQueueEvents,
OnQueueActive,
OnQueueEvent,
Process,
Processor,
} from 'nest-bull';
import { Job } from 'bull';
import { Logger } from '@nestjs/common';
import { AService } from './a-service';
import { AJobInterface } from '../AJobInterface';
@Processor({ name: 'a_queue' })
export class AJob {
private readonly logger = new Logger('AQueue');
constructor(private readonly service: AService) {}
@Process({
name: 'app',
concurrency: 1
})
processApp(job: Job<AJobInterface>) {
console.log('CHILD: ', process.pid);
const { jobId } = job.data;
return this.service.process(jobId);
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${JSON.stringify(
job.data,
)}...`,
);
}
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
this.logger.log(
`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
);
}
}
这是我的 app.module.ts:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { DatabaseModule } from './db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './worker/a-job';
import { AService } from './worker/a-service';
import { join } from 'path';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ join(__dirname, 'worker/a-job.js') ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
controllers: [AppController],
providers: [AppService, AJob, AService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
我做错了什么吗?
抱歉这么晚才发布答案。事实证明,在子进程中设置 worker 是不可能的。我最终有一个单独的 worker.module.ts
和一个单独的 worker.ts
并为 API 和 worker.
worker.module.ts
:
import { Module, OnModuleInit } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AppService } from '../app.service';
import { DatabaseModule } from '../db/module';
import { BullModule } from 'nest-bull';
import { AJob } from './a-job';
import { AService } from './a-service';
import { join } from 'path';
import { Job, DoneCallback } from 'bull';
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ (job: Job, done: DoneCallback) => { done(null, job.data); } ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
password: process.env.REDIS_PWD,
showFriendlyErrorStack: true,
},
settings: {
lockDuration: 300000,
stalledInterval: 300000
},
},
}),
],
providers: [AppService, AJob, AService],
})
export class WorkerModule implements OnModuleInit {
onModuleInit() {
console.log('WORKER: ', process.pid);
}
}
worker.ts
:
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker/worker.module';
async function bootstrap() {
const app = await NestFactory.create(WorkerModule);
app.init();
}
bootstrap();
虽然 app.module.ts
现在看起来像这样:
//...imports
@Module({
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register({
name: 'a_queue',
processors: [ ],
options: {
redis: {
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
},
},
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule implements OnModuleInit {
onModuleInit() {
console.log('MAIN: ', process.pid);
}
}
和对应的app.ts
:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { port } from './config';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.enableCors();
await app.listen(port);
}
bootstrap();