如何将 Kafkajs 与 Socket.io 连接

How do I connect Kafkajs with Socket.io

我一直在研究 kafkajs 并且 socket.io 我对它很陌生,我似乎无法理解一些事情。 我创建了一个聊天应用程序,基本上通过打开浏览器(客户端),您可以键入消息,它们会显示在聊天中-window。 我找到了一个让kafka打印“this message + i”的教程。 我不想发送到主题并打印 message+i 来打印人们在聊天中输入的内容,我不确定我应该怎么做。

这是我的consumer.js:

const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"

const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })

const consume = async () => {
    // first, we wait for the client to connect and subscribe to the given topic
    await consumer.connect()
    await consumer.subscribe({ topic })
    await consumer.run({
        // this function is called every time the consumer gets a new message
        eachMessage: ({ message }) => {
            // here, we just log the message to the standard output
            console.log(`received message: ${message.value}`)
        },
    })
}

module.exports = consume

这是我的producer.js:

// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")

// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"

// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()

// we define an async function that writes a new message each second
const produce = async () => {
    await producer.connect()
    let i = 0

    // after the produce has connected, we start an interval timer
    setInterval(async () => {
        try {
            // send a message to the configured topic with
            // the key and value formed from the current value of `i`
            await producer.send({
                topic,
                messages: [
                    {
                        key: String(i),
                        value: "this is message " + i,
                    },
                ],
            })

            // if the message is written successfully, log it and increment `i`
            console.log("writes: ", i)
            i++
        } catch (err) {
            console.error("could not write message " + err)
        }
    }, 1000)
}

module.exports = produce

我知道我应该以某种方式将主题经纪人和客户与 socket.io 联系起来,但我不确定如何。

这是我的 chat.js:

/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');

// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');

// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
    socket.emit('chat', {
        message: message.value,
        username: username.value
    });
    message.value = "";
});

message.addEventListener('keypress', function(){
    socket.emit('typing', username.value);
})

// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
    feedback.innerHTML = '';
    output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
});

socket.on('typing', function(data){
    feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
});

您需要 socket.io 服务器。

示例:

const consume = require('consumer.js';
const produce = require('producer.js';
const { Server } = require("socket.io");
const io = new Server();

consume(({ from, to, message }) => {
  io.sockets.emit('newMessage', { from, to, message });
})

io.on('connection', function(socket) {
  socket.emit('Hi!', { message: 'Chat connected', id: socket.id });
  
  socket.on('sendMessage', ({ message, to }) => {
     produce({ from: socket.id, to, message });
  });
  
});

您还需要修改您的消费者和生产者以接受参数和回调。

消费者示例:

...
const consume = async cb => {
    // first, we wait for the client to connect and subscribe to the given topic
    await consumer.connect()
    await consumer.subscribe({ topic })
    await consumer.run({
        // this function is called every time the consumer gets a new message
        eachMessage: ({ from, to, message }) => {
            cb({ from, to, message });
        },
    })
}

生产者示例:

const produce = async ({ from, to, message }) => {
    producer.send(topic, { from, to, message })
}

别忘了在客户端修改你的chat.js

所有这些都可以优化,只是一个简单的例子