阅读 kafka 主题并通过 Rest API 公开数据以供 prometheus 抓取(Nodejs)

我已经使用 kafkajs 公开从 kafka 主题读取的数据,这些数据将通过 prometheus 的 http 端点公开以抓取数据。但我无法公开来自 kafka 主题的数据。我写过这样的生产者和消费者


 // import the `Kafka` instance from the kafkajs library
const {
} = require("kafkajs")
const fs = require("fs");
const path = require("path");

// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"

// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({
    // logLevel: logLevel.INFO
const producer = kafka.producer({})

// we define an async function that writes a new message each second
const produce = async () => {
    await producer.connect()
    // after the produce has connected, we start an interval timer

    try {
        // send a message to the configured topic with
        // the key and value formed from the current value of `i`
        await producer.send({
            acks: 1,
            messages: [{
                key: "metrics on premise",
                value: fs.readFileSync(path.join(__dirname,'metrics.txt'), 'utf8'),
            }, ],

        // if the message is written successfully, log it and increment `i`
        console.log("writes:  #####################")
    } catch (err) {
        console.error("could not write message " + err)


module.exports = produce


const produce = require("./produce")
const consume = require("./consume")
const fs = require("fs");
const path = require("path");

const express = require('express')
const app = express()
const port = 3003

app.get('/metrics', async (req, res) => {
    //res.send(fs.readFileSync(path.join(__dirname,'topic_message.txt'), 'utf8'))

    consume(res).catch(err => {
        console.error("Error in consumer: ", err)

app.listen(port, () => {
    console.log(`Example app listening at http://localhost:${port}`)

// call the `produce` function and log an error if it occurs
produce().catch((err) => {
    console.error("error in producer: ", err)

下面是消费者 Consumer.js

 const {
} = require("kafkajs")
const fs = require("fs");
const path = require("path");
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"

const kafka = new Kafka({
    // logCreator: customLogger,
    // logLevel: logLevel.DEBUG,
const consumer = kafka.consumer({
    groupId: clientId,
    minBytes: 5,
    maxBytes: 1e6,
    // wait for at most 3 seconds before receiving new data
    maxWaitTimeInMs: 3000,

const consume = async (res) => {
    // first, we wait for the client to connect and subscribe to the given topic

    let myString = "";
    await consumer.connect()
    await consumer.subscribe({
        fromBeginning: true
    await consumer.run({
        // this function is called every time the consumer gets a new message
        eachMessage: ({
        }) => {
            console.log("Message received ###############################################################################");

    setTimeout(async () => {
        await consumer.disconnect();
    }, 2000);

module.exports = consume

当我点击 api 时,我无法将使用的消息发送到 API

除非您以某种方式通过流式 HTTP 响应或使用 websockets(您不在此代码中)进行抓取,否则我不确定这是一个好方法。

如果您真的想将 Kafka 记录发送到 Prometheus,请通过消费者的 PushGateway 发送它们,而不是使用同步 HTTP 抓取