将 Kafka 消息流式传输到 MySQL 数据库
Streaming Kafka Messages to MySQL Database
我想将 Kafka 消息写入 MySQL 数据库。 thislink中有例子。在该示例中,apache flume 用于使用消息并将其写入 MySQL。我使用相同的代码,当我 运行 时,flume-ng agent
和 event
总是变成 null
我的 flume.conf.properties
文件是:
agent.sources=kafkaSrc
agent.channels=channel1
agent.sinks=jdbcSink
agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel1.brokerList=localhost:9092
agent.channels.channel1.topic=kafkachannel
agent.channels.channel1.zookeeperConnect=localhost:2181
agent.channels.channel1.capacity=10000
agent.channels.channel1.transactionCapacity=1000
agent.channels.channel1.parseAsFlumeEvent=false
agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.channels = channel1
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181
agent.sources.kafkaSrc.topic = kafka-mysql
agent.sinks.jdbcSink.type = com.stratio.ingestion.sink.jdbc.JDBCSink
agent.sinks.jdbcSink.connectionString = jdbc:mysql://127.0.0.1:3306/test?useSSL=false
agent.sinks.jdbcSink.username=root
agent.sinks.jdbcSink.password=pass
agent.sinks.jdbcSink.batchSize = 10
agent.sinks.jdbcSink.channel =channel1
agent.sinks.jdbcSink.sqlDialect=MYSQL
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver
agent.sinks.jdbcSink.sql=INSERT INTO kafkamsg(msg) VALUES(${body:varchar})
我哪里错了?
谢谢。
在我的参考示例中,flume 个听众 kafka kafka-mysql
个主题。但是此代码适用于 kafkachannel
主题。所以我们需要向 kafkachannel
主题生成消息,我不知道为什么。
我想将 Kafka 消息写入 MySQL 数据库。 thislink中有例子。在该示例中,apache flume 用于使用消息并将其写入 MySQL。我使用相同的代码,当我 运行 时,flume-ng agent
和 event
总是变成 null
我的 flume.conf.properties
文件是:
agent.sources=kafkaSrc
agent.channels=channel1
agent.sinks=jdbcSink
agent.channels.channel1.type=org.apache.flume.channel.kafka.KafkaChannel
agent.channels.channel1.brokerList=localhost:9092
agent.channels.channel1.topic=kafkachannel
agent.channels.channel1.zookeeperConnect=localhost:2181
agent.channels.channel1.capacity=10000
agent.channels.channel1.transactionCapacity=1000
agent.channels.channel1.parseAsFlumeEvent=false
agent.sources.kafkaSrc.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSrc.channels = channel1
agent.sources.kafkaSrc.zookeeperConnect = localhost:2181
agent.sources.kafkaSrc.topic = kafka-mysql
agent.sinks.jdbcSink.type = com.stratio.ingestion.sink.jdbc.JDBCSink
agent.sinks.jdbcSink.connectionString = jdbc:mysql://127.0.0.1:3306/test?useSSL=false
agent.sinks.jdbcSink.username=root
agent.sinks.jdbcSink.password=pass
agent.sinks.jdbcSink.batchSize = 10
agent.sinks.jdbcSink.channel =channel1
agent.sinks.jdbcSink.sqlDialect=MYSQL
agent.sinks.jdbcSink.driver=com.mysql.jdbc.Driver
agent.sinks.jdbcSink.sql=INSERT INTO kafkamsg(msg) VALUES(${body:varchar})
我哪里错了?
谢谢。
在我的参考示例中,flume 个听众 kafka kafka-mysql
个主题。但是此代码适用于 kafkachannel
主题。所以我们需要向 kafkachannel
主题生成消息,我不知道为什么。