如何使用不断增长的文件作为 Apache Kafka 生产者并只读取新附加的数据
How to use a growing file as Apache Kafka producer and read only the newly appended data
我正在尝试使用 file 作为我的 kafka 生产者。源文件不断增长(比如每秒 20 records/lines)。下面是一个 post 类似于我的问题:
但在这种情况下,每次向文件中插入新行时,都会读取整个文件并将其添加到 Kafka 主题中。我只希望将新附加的行发送到主题(即,如果文件已经包含 10 行并且再附加 4 行,则只需要将这 4 行发送到主题)。
有办法实现吗??
尝试过的其他解决方案:
Apache flume 通过使用 源类型 作为 'spooldir'。但它没有用,因为它从添加到目录的新文件中读取数据,而不是在数据附加到已读取的文件时读取数据。
我们还尝试使用 flume source type 作为 'exec' 和 command 作为'tail –F /path/file-name'。这似乎也行不通。
也欢迎使用任何其他工具的建议,因为 我的 objective 是实时从文件中读取数据(即我需要数据作为一旦它被插入到文件中)。
根据您的具体需要,您可以查看几个选项。
卡夫卡连接
正如 Chin Huang above the FileSource Connector from Kafka Connect should be able to do what you want without installing additional software. Check out the Connect Quickstart 所说的有关如何启动它的指导和 运行,他们实际上有一个将文件读入 Kafka 的示例。
Logstash
Logstash 是类似这样的东西的经典选项,它的 Kafka 输出会为一个或多个文件执行您想要它执行的操作。下面的配置应该可以大致满足你的需求。
input {
file {
path => "/path/to/your/file"
}
output {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topic_id => "topicname"
}
}
Filebeat
Filebeat 与 Logstash 非常相似,如果您想对从文件读取的数据执行额外处理,它只提供较少的功能。此外,它是用 go 而不是 java 编写的,因此 运行 在机器上的占用空间应该更小。
以下应该是帮助您入门的最小配置(根据记忆,如果它们是必需的,您可能需要添加一两个参数):
filebeat.prospectors:
- type: log
paths:
- /path/to/your/file
output.kafka:
hosts: ["127.0.0.1:9092"]
topic: 'topicname'
Flume
如果您想重新访问 Flume 选项,请查看 TaildirSource,我没有使用过它,但听起来它应该非常适合您的用例。
我正在尝试使用 file 作为我的 kafka 生产者。源文件不断增长(比如每秒 20 records/lines)。下面是一个 post 类似于我的问题:
但在这种情况下,每次向文件中插入新行时,都会读取整个文件并将其添加到 Kafka 主题中。我只希望将新附加的行发送到主题(即,如果文件已经包含 10 行并且再附加 4 行,则只需要将这 4 行发送到主题)。
有办法实现吗??
尝试过的其他解决方案:
Apache flume 通过使用 源类型 作为 'spooldir'。但它没有用,因为它从添加到目录的新文件中读取数据,而不是在数据附加到已读取的文件时读取数据。
我们还尝试使用 flume source type 作为 'exec' 和 command 作为'tail –F /path/file-name'。这似乎也行不通。
也欢迎使用任何其他工具的建议,因为 我的 objective 是实时从文件中读取数据(即我需要数据作为一旦它被插入到文件中)。
根据您的具体需要,您可以查看几个选项。
卡夫卡连接
正如 Chin Huang above the FileSource Connector from Kafka Connect should be able to do what you want without installing additional software. Check out the Connect Quickstart 所说的有关如何启动它的指导和 运行,他们实际上有一个将文件读入 Kafka 的示例。
Logstash
Logstash 是类似这样的东西的经典选项,它的 Kafka 输出会为一个或多个文件执行您想要它执行的操作。下面的配置应该可以大致满足你的需求。
input {
file {
path => "/path/to/your/file"
}
output {
kafka {
bootstrap_servers => "127.0.0.1:9092"
topic_id => "topicname"
}
}
Filebeat
Filebeat 与 Logstash 非常相似,如果您想对从文件读取的数据执行额外处理,它只提供较少的功能。此外,它是用 go 而不是 java 编写的,因此 运行 在机器上的占用空间应该更小。 以下应该是帮助您入门的最小配置(根据记忆,如果它们是必需的,您可能需要添加一两个参数):
filebeat.prospectors:
- type: log
paths:
- /path/to/your/file
output.kafka:
hosts: ["127.0.0.1:9092"]
topic: 'topicname'
Flume
如果您想重新访问 Flume 选项,请查看 TaildirSource,我没有使用过它,但听起来它应该非常适合您的用例。