Kafka 连接器未按预期运行

Kafka connector not behaving as expected

我正在尝试使用 Kafka 文件脉冲连接器 (https://github.com/streamthoughts/kafka-connect-file-pulse) 将文件中的数据读入 Kafka 主题。

我使用以下方式启动连接器:

../bin/connect-standalone filepulse.properties connect-standalone.properties

filepulse.properties的内容:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/opt/kafka-tmp/connect.offsets
bootstrap.servers=localhost:9092
plugin.path=/Users/user1234/connectors/

连接内容-standalone.properties:

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

name=CsvSchemaSpoolDir
halt.on.error=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/opt/kafka-tmp/connect.offsets
plugin.path=/Users/plugins/connectors/
connector.class= io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
fs.cleanup.policy.class= io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class= io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/opt/kafka-tmp/dir-to-process
fs.scan.filters= io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=test.csv
fs.scan.interval.ms= 3000
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id= connect-file-pulse-log4j-quickstart
internal.kafka.reporter.topic= connect-file-pulse-status
offset.strategy=name
read.max.wait.ms=5000
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader
topic= logs-kafka-connect
tasks.max= 1

当我将数据添加到 test.csv 时,数据不会根据配置发送到主题 logs-kafka-connect。

../bin/connect-standalone filepulse.properties connect-standalone.properties 的启动看起来不错,因为文件 test.csv 是 detected:

[2021-01-01 21:01:26,800] INFO Waiting 2821 ms to scan for new files. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:87)
[2021-01-01 21:01:29,625] INFO Scanning local file system directory '/opt/kafka-tmp/dir-to-process' (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:222)
[2021-01-01 21:01:29,629] INFO Completed scanned, number of files detected '1'  (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:224)
[2021-01-01 21:01:29,815] INFO Finished lookup for new files : '0' files selected (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:229)
[2021-01-01 21:01:29,815] INFO Waiting 2810 ms to scan for new files. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:87)

我已经明确测试了在不使用插件的情况下生成和订阅主题 logs-kafka-connect 的消息,它的行为符合预期,添加了消息并且可以从主题中读取消息。因此,看来我没有正确配置插件。

我用插件配置的主题:logs-kafka-connect 是由插件创建的,但是将消息添加到它配置为侦听的文件 (test.csv) 不会将消息发送到话题。如何配置插件,以便将添加到 test.csv 的项目发送到主题 logs-kafka-connect

更新:

现在在我看来,Kafka 并不适用于将文件更新流式传输到 Kafka 主题的用例。我将使用 filebeat 来实现我的目标,整合多个日志文件,以便更轻松地检查日志文件。

Kafka Connect 可用于使用 FilePulse、Spooldir 等连接器将文件中的数据记录流式传输到 Kafka。

但是,如果您需要摄取日志文件(例如 log4j 应用程序文件),那么 Logstash 或 Filebeat 可能更可取,即使您也可以为此目的使用 FilePulse。

关于您的问题,我认为问题出在配置 属性 : file.filter.regex.pattern=test.csv 接受正则表达式。

您应该使用这个值: "file.filter.regex.pattern":".*\.csv$"

免责声明:我是 Kafka Connect FilePulse 的作者