如何将 Kafkajs 与 Socket.io 连接
How do I connect Kafkajs with Socket.io
我一直在研究 kafkajs 并且 socket.io 我对它很陌生,我似乎无法理解一些事情。
我创建了一个聊天应用程序,基本上通过打开浏览器(客户端),您可以键入消息,它们会显示在聊天中-window。
我找到了一个让kafka打印“this message + i”的教程。
我不想发送到主题并打印 message+i 来打印人们在聊天中输入的内容,我不确定我应该怎么做。
这是我的consumer.js:
const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"
const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })
const consume = async () => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ message }) => {
// here, we just log the message to the standard output
console.log(`received message: ${message.value}`)
},
})
}
module.exports = consume
这是我的producer.js:
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")
// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()
// we define an async function that writes a new message each second
const produce = async () => {
await producer.connect()
let i = 0
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send({
topic,
messages: [
{
key: String(i),
value: "this is message " + i,
},
],
})
// if the message is written successfully, log it and increment `i`
console.log("writes: ", i)
i++
} catch (err) {
console.error("could not write message " + err)
}
}, 1000)
}
module.exports = produce
我知道我应该以某种方式将主题经纪人和客户与 socket.io 联系起来,但我不确定如何。
这是我的 chat.js:
/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');
// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');
// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
socket.emit('chat', {
message: message.value,
username: username.value
});
message.value = "";
});
message.addEventListener('keypress', function(){
socket.emit('typing', username.value);
})
// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
feedback.innerHTML = '';
output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
});
socket.on('typing', function(data){
feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
});
您需要 socket.io 服务器。
示例:
const consume = require('consumer.js';
const produce = require('producer.js';
const { Server } = require("socket.io");
const io = new Server();
consume(({ from, to, message }) => {
io.sockets.emit('newMessage', { from, to, message });
})
io.on('connection', function(socket) {
socket.emit('Hi!', { message: 'Chat connected', id: socket.id });
socket.on('sendMessage', ({ message, to }) => {
produce({ from: socket.id, to, message });
});
});
您还需要修改您的消费者和生产者以接受参数和回调。
消费者示例:
...
const consume = async cb => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ from, to, message }) => {
cb({ from, to, message });
},
})
}
生产者示例:
const produce = async ({ from, to, message }) => {
producer.send(topic, { from, to, message })
}
别忘了在客户端修改你的chat.js
所有这些都可以优化,只是一个简单的例子
我一直在研究 kafkajs 并且 socket.io 我对它很陌生,我似乎无法理解一些事情。 我创建了一个聊天应用程序,基本上通过打开浏览器(客户端),您可以键入消息,它们会显示在聊天中-window。 我找到了一个让kafka打印“this message + i”的教程。 我不想发送到主题并打印 message+i 来打印人们在聊天中输入的内容,我不确定我应该怎么做。
这是我的consumer.js:
const { Kafka } = require("kafkajs")
const clientId = "my-app"
const brokers = ["localhost:9092"]
const topic = "message-log"
const kafka = new Kafka({ clientId, brokers })
// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })
const consume = async () => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ message }) => {
// here, we just log the message to the standard output
console.log(`received message: ${message.value}`)
},
})
}
module.exports = consume
这是我的producer.js:
// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")
// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"
// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()
// we define an async function that writes a new message each second
const produce = async () => {
await producer.connect()
let i = 0
// after the produce has connected, we start an interval timer
setInterval(async () => {
try {
// send a message to the configured topic with
// the key and value formed from the current value of `i`
await producer.send({
topic,
messages: [
{
key: String(i),
value: "this is message " + i,
},
],
})
// if the message is written successfully, log it and increment `i`
console.log("writes: ", i)
i++
} catch (err) {
console.error("could not write message " + err)
}
}, 1000)
}
module.exports = produce
我知道我应该以某种方式将主题经纪人和客户与 socket.io 联系起来,但我不确定如何。
这是我的 chat.js:
/* Kane connection sto server opos prin
exw tin ikanotita na xrhsimopoihsw to io logo tou library pou phra apo to documentation*/
var socket = io.connect('http://localhost:8000');
// linking Variables toy indexhtml
var message = document.getElementById('message');
var username = document.getElementById('username');
var btn = document.getElementById('send');
var output = document.getElementById('output');
var feedback = document.getElementById('feedback');
// Stelnw events pou ginonte apo ton xristi kai stelnonte ston server
btn.addEventListener('click', function(){
socket.emit('chat', {
message: message.value,
username: username.value
});
message.value = "";
});
message.addEventListener('keypress', function(){
socket.emit('typing', username.value);
})
// Events wste na perimenw to data apo ton server
socket.on('chat', function(data){
feedback.innerHTML = '';
output.innerHTML += '<p><strong>' + data.username + ': </strong>' + data.message + '</p>';
});
socket.on('typing', function(data){
feedback.innerHTML = '<p><em>' + data + ' is typing a message...</em></p>';
});
您需要 socket.io 服务器。
示例:
const consume = require('consumer.js';
const produce = require('producer.js';
const { Server } = require("socket.io");
const io = new Server();
consume(({ from, to, message }) => {
io.sockets.emit('newMessage', { from, to, message });
})
io.on('connection', function(socket) {
socket.emit('Hi!', { message: 'Chat connected', id: socket.id });
socket.on('sendMessage', ({ message, to }) => {
produce({ from: socket.id, to, message });
});
});
您还需要修改您的消费者和生产者以接受参数和回调。
消费者示例:
...
const consume = async cb => {
// first, we wait for the client to connect and subscribe to the given topic
await consumer.connect()
await consumer.subscribe({ topic })
await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ from, to, message }) => {
cb({ from, to, message });
},
})
}
生产者示例:
const produce = async ({ from, to, message }) => {
producer.send(topic, { from, to, message })
}
别忘了在客户端修改你的chat.js
所有这些都可以优化,只是一个简单的例子