如何将文件写入 Kafka Producer

How to write a file to Kafka Producer

我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入。 下载 Kafka 后,我执行了以下步骤:

启动动物园管理员:

bin/zookeeper-server-start.sh config/zookeeper.properties

已启动服务器

bin/kafka-server-start.sh config/server.properties

创建了一个名为 "test" 的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行 制片人:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

消费者收听:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

我想将一个数据文件甚至一个简单的文本文件传递给消费者可以直接看到的生产者,而不是标准输入。任何帮助将不胜感激。谢谢!

您可以通过管道输入:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

找到 here

从 0.9.0 开始:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

在 Kafka-0.9.0 中为我工作

echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

这里有一些更通用的方法,但对于简单文件来说可能有点过分了

尾巴

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

说明

  1. tail 随着文件的增长或日志的不断添加,从文件末尾开始读取
  2. -n0表示输出最后0行所以只选择新行
  3. -F 按名称而不是描述符跟随文件,因此即使它被旋转也能正常工作

syslog-ng

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

消费

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

说明(来自man nc

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

参考

Syslog-ng