Kafka Connector HDFS Sink 5.3.1 无法生成所有 JSON 记录
Kafka Connector HDFS Sink 5.3.1 not able to produce all JSON records
用例
我正在阅读一个已经创建的 Kafka 主题,其中一个单独的集群正在生成一些键和值。我的最终目标是以 JSON 格式写入 HDFS,为此我已经尝试使用 Kafka HDFS Sink 5.3 一段时间了。
我面临的问题是我无法将主题中的所有记录摄取并写入 HDFS。截至目前,如果我的主题包含数百万条记录的每小时数据,我只能根据 100K 条记录来编写。
以下是我用于 kafka-connect-standalone.properties 和我的 HDFS quickstart-hdfs.properties[ 的配置=37=]
kafka-connect-standalone.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false
offset.flush.interval.ms=10000
group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
快速入门-hdfs.properties
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all
format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC
如果我不使用
errors.tolerance=all 属性 然后我只生成了 ~500 条记录。
就工作日志而言,我没有收到任何错误,所以我不确定我遗漏了什么。
由于我对 Kafka 连接器比较陌生并且已经尝试了一段时间,如果有人可以提供一些关于我做错了什么的见解,我将不胜感激。
跟进问题
kafka 连接器也在 2 天内死亡。也就是说,它可以正常工作近 2 天,但一段时间后它停止读取数据并且不产生任何结果。我是 运行 它在独立模式下,这可能是原因吗?我试着描述消费者群体,好像所有的消费者都死了。
kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21 1186755480 1187487551 732071 - - -
connect-ajay-hdfs-sink-mapr topic_name 12 957021804 957736810 715006 - - -
connect-ajay-hdfs-sink-mapr topic_name 17 957031965 957746941 714976 - - -
connect-ajay-hdfs-sink-mapr topic_name 24 957496491 958212413 715922 - - -
connect-ajay-hdfs-sink-mapr topic_name 0 956991807 957716202 724395 - - -
connect-ajay-hdfs-sink-mapr topic_name 28 956940273 957668689 728416 - - -
connect-ajay-hdfs-sink-mapr topic_name 5 957182822 957899308 716486 - - -
connect-ajay-hdfs-sink-mapr topic_name 3 956974180 957695189 721009 - - -
connect-ajay-hdfs-sink-mapr topic_name 19 956878365 957590196 711831 - - -
connect-ajay-hdfs-sink-mapr topic_name 2 956968023 957685835 717812 - - -
connect-ajay-hdfs-sink-mapr topic_name 16 957010175 957726139 715964 - - -
connect-ajay-hdfs-sink-mapr topic_name 7 956900190 957624746 724556 - - -
connect-ajay-hdfs-sink-mapr topic_name 8 957020325 957739604 719279 - - -
connect-ajay-hdfs-sink-mapr topic_name 22 957064283 957788487 724204 - - -
connect-ajay-hdfs-sink-mapr topic_name 29 957026931 957744496 717565 - - -
connect-ajay-hdfs-sink-mapr topic_name 13 957400623 958129555 728932 - - -
connect-ajay-hdfs-sink-mapr topic_name 6 956892063 957618485 726422 - - -
connect-ajay-hdfs-sink-mapr topic_name 11 957117685 957841645 723960 - - -
connect-ajay-hdfs-sink-mapr topic_name 1 957003873 957734649 730776 - - -
connect-ajay-hdfs-sink-mapr topic_name 18 957007813 957734011 726198 - - -
connect-ajay-hdfs-sink-mapr topic_name 27 957047658 957766131 718473 - - -
connect-ajay-hdfs-sink-mapr topic_name 10 956975729 957689182 713453 - - -
connect-ajay-hdfs-sink-mapr topic_name 15 957046441 957775251 728810 - - -
connect-ajay-hdfs-sink-mapr topic_name 23 957011972 957727996 716024 - - -
connect-ajay-hdfs-sink-mapr topic_name 14 957151628 957881644 730016 - - -
connect-ajay-hdfs-sink-mapr topic_name 4 957118644 957845399 726755 - - -
connect-ajay-hdfs-sink-mapr topic_name 9 957109152 957838497 729345 - - -
connect-ajay-hdfs-sink-mapr topic_name 25 956923833 957646070 722237 - - -
connect-ajay-hdfs-sink-mapr topic_name 26 957026885 957742112 715227 - - -
connect-ajay-hdfs-sink-mapr topic_name 20 957010071 957733605 723534 - - -
为了将主题中的所有现有记录获取到接收器连接器,将其添加到工作属性并重新启动连接
consumer.auto.offset.reset=earliest
如果您已经启动了连接器,则需要重置其使用者组,或更改配置中的名称以创建新组
用例
我正在阅读一个已经创建的 Kafka 主题,其中一个单独的集群正在生成一些键和值。我的最终目标是以 JSON 格式写入 HDFS,为此我已经尝试使用 Kafka HDFS Sink 5.3 一段时间了。 我面临的问题是我无法将主题中的所有记录摄取并写入 HDFS。截至目前,如果我的主题包含数百万条记录的每小时数据,我只能根据 100K 条记录来编写。
以下是我用于 kafka-connect-standalone.properties 和我的 HDFS quickstart-hdfs.properties[ 的配置=37=]
kafka-connect-standalone.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false
offset.flush.interval.ms=10000
group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
快速入门-hdfs.properties
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all
format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC
如果我不使用 errors.tolerance=all 属性 然后我只生成了 ~500 条记录。
就工作日志而言,我没有收到任何错误,所以我不确定我遗漏了什么。
由于我对 Kafka 连接器比较陌生并且已经尝试了一段时间,如果有人可以提供一些关于我做错了什么的见解,我将不胜感激。
跟进问题
kafka 连接器也在 2 天内死亡。也就是说,它可以正常工作近 2 天,但一段时间后它停止读取数据并且不产生任何结果。我是 运行 它在独立模式下,这可能是原因吗?我试着描述消费者群体,好像所有的消费者都死了。
kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21 1186755480 1187487551 732071 - - -
connect-ajay-hdfs-sink-mapr topic_name 12 957021804 957736810 715006 - - -
connect-ajay-hdfs-sink-mapr topic_name 17 957031965 957746941 714976 - - -
connect-ajay-hdfs-sink-mapr topic_name 24 957496491 958212413 715922 - - -
connect-ajay-hdfs-sink-mapr topic_name 0 956991807 957716202 724395 - - -
connect-ajay-hdfs-sink-mapr topic_name 28 956940273 957668689 728416 - - -
connect-ajay-hdfs-sink-mapr topic_name 5 957182822 957899308 716486 - - -
connect-ajay-hdfs-sink-mapr topic_name 3 956974180 957695189 721009 - - -
connect-ajay-hdfs-sink-mapr topic_name 19 956878365 957590196 711831 - - -
connect-ajay-hdfs-sink-mapr topic_name 2 956968023 957685835 717812 - - -
connect-ajay-hdfs-sink-mapr topic_name 16 957010175 957726139 715964 - - -
connect-ajay-hdfs-sink-mapr topic_name 7 956900190 957624746 724556 - - -
connect-ajay-hdfs-sink-mapr topic_name 8 957020325 957739604 719279 - - -
connect-ajay-hdfs-sink-mapr topic_name 22 957064283 957788487 724204 - - -
connect-ajay-hdfs-sink-mapr topic_name 29 957026931 957744496 717565 - - -
connect-ajay-hdfs-sink-mapr topic_name 13 957400623 958129555 728932 - - -
connect-ajay-hdfs-sink-mapr topic_name 6 956892063 957618485 726422 - - -
connect-ajay-hdfs-sink-mapr topic_name 11 957117685 957841645 723960 - - -
connect-ajay-hdfs-sink-mapr topic_name 1 957003873 957734649 730776 - - -
connect-ajay-hdfs-sink-mapr topic_name 18 957007813 957734011 726198 - - -
connect-ajay-hdfs-sink-mapr topic_name 27 957047658 957766131 718473 - - -
connect-ajay-hdfs-sink-mapr topic_name 10 956975729 957689182 713453 - - -
connect-ajay-hdfs-sink-mapr topic_name 15 957046441 957775251 728810 - - -
connect-ajay-hdfs-sink-mapr topic_name 23 957011972 957727996 716024 - - -
connect-ajay-hdfs-sink-mapr topic_name 14 957151628 957881644 730016 - - -
connect-ajay-hdfs-sink-mapr topic_name 4 957118644 957845399 726755 - - -
connect-ajay-hdfs-sink-mapr topic_name 9 957109152 957838497 729345 - - -
connect-ajay-hdfs-sink-mapr topic_name 25 956923833 957646070 722237 - - -
connect-ajay-hdfs-sink-mapr topic_name 26 957026885 957742112 715227 - - -
connect-ajay-hdfs-sink-mapr topic_name 20 957010071 957733605 723534 - - -
为了将主题中的所有现有记录获取到接收器连接器,将其添加到工作属性并重新启动连接
consumer.auto.offset.reset=earliest
如果您已经启动了连接器,则需要重置其使用者组,或更改配置中的名称以创建新组