Link Kafka 和 HDFS 以及 docker 个容器
Link Kafka and HDFS with docker containers
大家好,我正在尝试使用Kafka Connect连接Kafka和HDFS,但我仍然面临一个无法解决的问题。
我正在使用这个例子:https://clubhouse.io/developer-how-to/how-to-set-up-a-hadoop-cluster-in-docker/
我首先启动 HDFS:docker-compose up -d
然后我启动 zookeeper kafka 和 mysql 使用来自 debezium 网站的图像。
https://debezium.io/documentation/reference/1.0/tutorial.html
docker 运行 -it --rm --name zookeeper --network docker-hadoop-master_default -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.0
docker 运行 -it --rm --name kafka --network docker-hadoop-master_default -e ZOOKEEPER_CONNECT=zookeeper -p 9092:9092--linkzookeeper:zookeeperdebezium/kafka:1.0
docker 运行 -it --rm --name mysql --network docker-hadoop-master_default -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium /example-mysql:1.0
我在这些 运行 上使用网络,因为当我尝试从 docker-compose.yml 上的 HDFS 更改网络时,资源管理器关闭并且没有不管我怎么都找不到办法让他重新站起来让他稳定。
所以直接在zookeeper kafka和这些容器上添加mysql.
然后,这是最棘手的部分,Kafka Connect,我在这种情况下使用了相同的网络,这很有意义。
docker 运行 -it --rm --name connect --network docker-hadoop-master_default -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e BOOTSTRAP_SERVERS= "172.18.0.10:9092" -e CORE_CONF_fs_defaultFS=hdfs://172.18.0.2:9000 --link namenode:namenode --link zookeeper:zookeeper -- link mysql:mysql debezium/connect:1.0
到 link 到 Kafka 的源代码(Mysql)我使用 debezium 教程中的连接器,如下所示。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
我测试了 Kafka 是否从源接收到任何事件并且工作正常。
设置好后,我开始安装插件,我从confluent网站下载并粘贴到我的本地机器上Linux,然后我安装了Confluent-Hub ,然后是我本地机器上的插件。然后我创建了用户 kafka 并将插件目录中的所有内容更改为 kafka:kafka.
毕竟我使用 docker cp :/kafka/connect 复制到 Kafka Connect。
然后检查是否存在,然后重启Kafka Connect进行安装
我们可以用它来检查是否安装: curl -i -X GET -H "Accept:application/json" localhost:8083/connector -插件
你需要在某个地方看到这个: [{"class":"io.confluent.connect.hdfs.HdfsSinkConnector","type":"sink","version":"5.4.0"},...
在这一步之后,我认为我的问题出在哪里:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":1,"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders, dbserver1.inventory.geom,dbserver1.inventory.addresses","hdfs.url":"hdfs://172.18.0.2:9000","flush.size":3,"logs.dir":"logs","topics.dir":"kafka","format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat","partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner","partition.field.name":"day"}}'
我不知道如何说服 Kafka Connect 我想要来自 namenode 的特定 IP 地址,他只是保留我发现不同 IP 的消息,而预期的是 hdfs://namenode :9000
也加上这个 -e CORE_CONF_fs_defaultFS=hdfs://172.18.0.2:9000 docker 运行 我们在 Kafka Connect 中设置它,当我 POST hdfs-sink 的卷曲时,他向我发送以下消息。
来自 Kafka Connect 的日志:
2020-01-21 15:22:09,597 INFO || Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,597 INFO || Instantiated connector hdfs-sink with version 5.4.0 of type class io.confluent.connect.hdfs.HdfsSinkConnector [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,598 INFO || HdfsSinkConnectorConfig values:
avro.codec = null
connect.hdfs.keytab =
connect.hdfs.principal =
connect.meta.data = true
enhanced.avro.schema.support = false
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://172.18.0.2:9000
kerberos.ticket.renew.period.ms = 3600000
logs.dir = logs
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
[io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-21 15:22:09,599 INFO || StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
store.url = null
topics.dir = kafka
[io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-21 15:22:09,599 INFO || HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
[io.confluent.connect.storage.hive.HiveConfig]
2020-01-21 15:22:09,600 INFO || PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name = [day]
partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
path.format =
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
[io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-21 15:22:09,601 INFO || Finished creating connector hdfs-sink [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,601 INFO || SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.SinkConnectorConfig]
2020-01-21 15:22:09,602 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,604 INFO || [Worker clientId=connect-1, groupId=1] Starting task hdfs-sink-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-21 15:22:09,605 INFO || Creating task hdfs-sink-0 [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,606 INFO || ConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig]
2020-01-21 15:22:09,607 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,608 INFO || TaskConfig values:
task.class = class io.confluent.connect.hdfs.HdfsSinkTask
[org.apache.kafka.connect.runtime.TaskConfig]
2020-01-21 15:22:09,608 INFO || Instantiated task hdfs-sink-0 with version 5.4.0 of type io.confluent.connect.hdfs.HdfsSinkTask [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,609 INFO || JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[org.apache.kafka.connect.json.JsonConverterConfig]
2020-01-21 15:22:09,610 INFO || Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,610 INFO || JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[org.apache.kafka.connect.json.JsonConverterConfig]
2020-01-21 15:22:09,611 INFO || Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,611 INFO || Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,613 INFO || Initializing: org.apache.kafka.connect.runtime.TransformationChain{} [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,614 INFO || SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.SinkConnectorConfig]
2020-01-21 15:22:09,618 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,622 INFO || ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.18.0.10:9092]
check.crcs = true
client.dns.lookup = default
client.id = connector-consumer-hdfs-sink-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-hdfs-sink
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[org.apache.kafka.clients.consumer.ConsumerConfig]
2020-01-21 15:22:09,653 INFO || Kafka version: 2.4.0 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,653 INFO || Kafka commitId: 77a89fcf8d7fa018 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,654 INFO || Kafka startTimeMs: 1579620129652 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,659 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-21 15:22:09,677 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Subscribed to topic(s): dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses [org.apache.kafka.clients.consumer.KafkaConsumer]
2020-01-21 15:22:09,678 INFO || HdfsSinkConnectorConfig values:
avro.codec = null
connect.hdfs.keytab =
connect.hdfs.principal =
connect.meta.data = true
enhanced.avro.schema.support = false
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://172.18.0.2:9000
kerberos.ticket.renew.period.ms = 3600000
logs.dir = logs
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
[io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-21 15:22:09,679 INFO || StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
store.url = null
topics.dir = kafka
[io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-21 15:22:09,679 INFO || HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
[io.confluent.connect.storage.hive.HiveConfig]
2020-01-21 15:22:09,680 INFO || PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name = [day]
partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
path.format =
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
[io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-21 15:22:09,681 INFO || AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
[io.confluent.connect.avro.AvroDataConfig]
2020-01-21 15:22:09,681 INFO || Hadoop configuration directory [io.confluent.connect.hdfs.DataWriter]
2020-01-21 15:22:09,757 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in hostname at index 36: hdfs://namenode.docker-hadoop-master_default:9000
at org.apache.hadoop.net.NetUtils.getCanonicalUri(NetUtils.java:274)
at org.apache.hadoop.hdfs.DistributedFileSystem.canonicalizeUri(DistributedFileSystem.java:1577)
at org.apache.hadoop.fs.FileSystem.getCanonicalUri(FileSystem.java:235)
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:623)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access[=12=]0(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at io.confluent.connect.hdfs.storage.HdfsStorage.exists(HdfsStorage.java:149)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:548)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:222)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:102)
at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.URISyntaxException: Illegal character in hostname at index 36: hdfs://namenode.docker-hadoop-master_default:9000
at java.base/java.net.URI$Parser.fail(URI.java:2913)
at java.base/java.net.URI$Parser.parseHostname(URI.java:3448)
at java.base/java.net.URI$Parser.parseServer(URI.java:3297)
at java.base/java.net.URI$Parser.parseAuthority(URI.java:3216)
at java.base/java.net.URI$Parser.parseHierarchical(URI.java:3158)
at java.base/java.net.URI$Parser.parse(URI.java:3114)
at java.base/java.net.URI.<init>(URI.java:685)
at org.apache.hadoop.net.NetUtils.getCanonicalUri(NetUtils.java:272)
... 24 more
2020-01-21 15:22:09,759 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
IP 地址:
名称节点:172.18.0.2
Kafka:172.18.0.10
我认为这可能与网络有关。
下面错误较多,希望大家帮帮我。
提前致谢!
默认情况下,Docker compose 添加一个下划线和您所在的目录 运行 命令下划线不允许出现在主机名中。 Hadoop 在 hdfs-site.xml
配置文件中默认首选主机名。
I have no idea how to convince Kafka Connect that I want a specific IP address from the namenode, he just keeps my trowing messages that found a different IP when the expected is hdfs://namenode:9000
理想情况下,您无论如何都不会使用 Docker 内的 IP,您会使用服务名称和公开的端口。
对于 HDFS 连接器,您还需要定义 1) HADOOP_CONF_DIR
env-var 2) 将您的 XML 配置挂载为远程客户端(例如 Connect)与 Hadoop 集群交互的卷和 3) 在连接器 属性 中定义 hadoop.conf.dir
。
大家好,我正在尝试使用Kafka Connect连接Kafka和HDFS,但我仍然面临一个无法解决的问题。
我正在使用这个例子:https://clubhouse.io/developer-how-to/how-to-set-up-a-hadoop-cluster-in-docker/
我首先启动 HDFS:docker-compose up -d
然后我启动 zookeeper kafka 和 mysql 使用来自 debezium 网站的图像。 https://debezium.io/documentation/reference/1.0/tutorial.html
docker 运行 -it --rm --name zookeeper --network docker-hadoop-master_default -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.0
docker 运行 -it --rm --name kafka --network docker-hadoop-master_default -e ZOOKEEPER_CONNECT=zookeeper -p 9092:9092--linkzookeeper:zookeeperdebezium/kafka:1.0
docker 运行 -it --rm --name mysql --network docker-hadoop-master_default -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium /example-mysql:1.0
我在这些 运行 上使用网络,因为当我尝试从 docker-compose.yml 上的 HDFS 更改网络时,资源管理器关闭并且没有不管我怎么都找不到办法让他重新站起来让他稳定。 所以直接在zookeeper kafka和这些容器上添加mysql.
然后,这是最棘手的部分,Kafka Connect,我在这种情况下使用了相同的网络,这很有意义。
docker 运行 -it --rm --name connect --network docker-hadoop-master_default -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses -e BOOTSTRAP_SERVERS= "172.18.0.10:9092" -e CORE_CONF_fs_defaultFS=hdfs://172.18.0.2:9000 --link namenode:namenode --link zookeeper:zookeeper -- link mysql:mysql debezium/connect:1.0
到 link 到 Kafka 的源代码(Mysql)我使用 debezium 教程中的连接器,如下所示。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
我测试了 Kafka 是否从源接收到任何事件并且工作正常。
设置好后,我开始安装插件,我从confluent网站下载并粘贴到我的本地机器上Linux,然后我安装了Confluent-Hub ,然后是我本地机器上的插件。然后我创建了用户 kafka 并将插件目录中的所有内容更改为 kafka:kafka.
毕竟我使用 docker cp :/kafka/connect 复制到 Kafka Connect。
然后检查是否存在,然后重启Kafka Connect进行安装
我们可以用它来检查是否安装: curl -i -X GET -H "Accept:application/json" localhost:8083/connector -插件
你需要在某个地方看到这个: [{"class":"io.confluent.connect.hdfs.HdfsSinkConnector","type":"sink","version":"5.4.0"},...
在这一步之后,我认为我的问题出在哪里: curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":1,"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders, dbserver1.inventory.geom,dbserver1.inventory.addresses","hdfs.url":"hdfs://172.18.0.2:9000","flush.size":3,"logs.dir":"logs","topics.dir":"kafka","format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat","partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner","partition.field.name":"day"}}'
我不知道如何说服 Kafka Connect 我想要来自 namenode 的特定 IP 地址,他只是保留我发现不同 IP 的消息,而预期的是 hdfs://namenode :9000
也加上这个 -e CORE_CONF_fs_defaultFS=hdfs://172.18.0.2:9000 docker 运行 我们在 Kafka Connect 中设置它,当我 POST hdfs-sink 的卷曲时,他向我发送以下消息。
来自 Kafka Connect 的日志:
2020-01-21 15:22:09,597 INFO || Creating connector hdfs-sink of type io.confluent.connect.hdfs.HdfsSinkConnector [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,597 INFO || Instantiated connector hdfs-sink with version 5.4.0 of type class io.confluent.connect.hdfs.HdfsSinkConnector [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,598 INFO || HdfsSinkConnectorConfig values:
avro.codec = null
connect.hdfs.keytab =
connect.hdfs.principal =
connect.meta.data = true
enhanced.avro.schema.support = false
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://172.18.0.2:9000
kerberos.ticket.renew.period.ms = 3600000
logs.dir = logs
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
[io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-21 15:22:09,599 INFO || StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
store.url = null
topics.dir = kafka
[io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-21 15:22:09,599 INFO || HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
[io.confluent.connect.storage.hive.HiveConfig]
2020-01-21 15:22:09,600 INFO || PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name = [day]
partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
path.format =
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
[io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-21 15:22:09,601 INFO || Finished creating connector hdfs-sink [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,601 INFO || SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.SinkConnectorConfig]
2020-01-21 15:22:09,602 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,604 INFO || [Worker clientId=connect-1, groupId=1] Starting task hdfs-sink-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-21 15:22:09,605 INFO || Creating task hdfs-sink-0 [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,606 INFO || ConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig]
2020-01-21 15:22:09,607 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,608 INFO || TaskConfig values:
task.class = class io.confluent.connect.hdfs.HdfsSinkTask
[org.apache.kafka.connect.runtime.TaskConfig]
2020-01-21 15:22:09,608 INFO || Instantiated task hdfs-sink-0 with version 5.4.0 of type io.confluent.connect.hdfs.HdfsSinkTask [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,609 INFO || JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[org.apache.kafka.connect.json.JsonConverterConfig]
2020-01-21 15:22:09,610 INFO || Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,610 INFO || JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
[org.apache.kafka.connect.json.JsonConverterConfig]
2020-01-21 15:22:09,611 INFO || Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,611 INFO || Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task hdfs-sink-0 using the worker config [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,613 INFO || Initializing: org.apache.kafka.connect.runtime.TransformationChain{} [org.apache.kafka.connect.runtime.Worker]
2020-01-21 15:22:09,614 INFO || SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.SinkConnectorConfig]
2020-01-21 15:22:09,618 INFO || EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.hdfs.HdfsSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = hdfs-sink
tasks.max = 1
topics = [dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses]
topics.regex =
transforms = []
value.converter = null
[org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2020-01-21 15:22:09,622 INFO || ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [172.18.0.10:9092]
check.crcs = true
client.dns.lookup = default
client.id = connector-consumer-hdfs-sink-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-hdfs-sink
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[org.apache.kafka.clients.consumer.ConsumerConfig]
2020-01-21 15:22:09,653 INFO || Kafka version: 2.4.0 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,653 INFO || Kafka commitId: 77a89fcf8d7fa018 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,654 INFO || Kafka startTimeMs: 1579620129652 [org.apache.kafka.common.utils.AppInfoParser]
2020-01-21 15:22:09,659 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-01-21 15:22:09,677 INFO || [Consumer clientId=connector-consumer-hdfs-sink-0, groupId=connect-hdfs-sink] Subscribed to topic(s): dbserver1, dbserver1.inventory.products, dbserver1.inventory.products_on_hand, dbserver1.inventory.customers, dbserver1.inventory.orders, dbserver1.inventory.geom, dbserver1.inventory.addresses [org.apache.kafka.clients.consumer.KafkaConsumer]
2020-01-21 15:22:09,678 INFO || HdfsSinkConnectorConfig values:
avro.codec = null
connect.hdfs.keytab =
connect.hdfs.principal =
connect.meta.data = true
enhanced.avro.schema.support = false
filename.offset.zero.pad.width = 10
flush.size = 3
format.class = class io.confluent.connect.hdfs.parquet.ParquetFormat
hadoop.conf.dir =
hadoop.home =
hdfs.authentication.kerberos = false
hdfs.namenode.principal =
hdfs.url = hdfs://172.18.0.2:9000
kerberos.ticket.renew.period.ms = 3600000
logs.dir = logs
retry.backoff.ms = 5000
rotate.interval.ms = -1
rotate.schedule.interval.ms = -1
schema.cache.size = 1000
schema.compatibility = NONE
shutdown.timeout.ms = 3000
[io.confluent.connect.hdfs.HdfsSinkConnectorConfig]
2020-01-21 15:22:09,679 INFO || StorageCommonConfig values:
directory.delim = /
file.delim = +
storage.class = class io.confluent.connect.hdfs.storage.HdfsStorage
store.url = null
topics.dir = kafka
[io.confluent.connect.storage.common.StorageCommonConfig]
2020-01-21 15:22:09,679 INFO || HiveConfig values:
hive.conf.dir =
hive.database = default
hive.home =
hive.integration = false
hive.metastore.uris =
[io.confluent.connect.storage.hive.HiveConfig]
2020-01-21 15:22:09,680 INFO || PartitionerConfig values:
locale =
partition.duration.ms = -1
partition.field.name = [day]
partitioner.class = class io.confluent.connect.hdfs.partitioner.DefaultPartitioner
path.format =
timestamp.extractor = Wallclock
timestamp.field = timestamp
timezone =
[io.confluent.connect.storage.partitioner.PartitionerConfig]
2020-01-21 15:22:09,681 INFO || AvroDataConfig values:
connect.meta.data = true
enhanced.avro.schema.support = false
schemas.cache.config = 1000
[io.confluent.connect.avro.AvroDataConfig]
2020-01-21 15:22:09,681 INFO || Hadoop configuration directory [io.confluent.connect.hdfs.DataWriter]
2020-01-21 15:22:09,757 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask]
java.lang.IllegalArgumentException: java.net.URISyntaxException: Illegal character in hostname at index 36: hdfs://namenode.docker-hadoop-master_default:9000
at org.apache.hadoop.net.NetUtils.getCanonicalUri(NetUtils.java:274)
at org.apache.hadoop.hdfs.DistributedFileSystem.canonicalizeUri(DistributedFileSystem.java:1577)
at org.apache.hadoop.fs.FileSystem.getCanonicalUri(FileSystem.java:235)
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:623)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access[=12=]0(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at io.confluent.connect.hdfs.storage.HdfsStorage.exists(HdfsStorage.java:149)
at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:548)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:222)
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:102)
at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.URISyntaxException: Illegal character in hostname at index 36: hdfs://namenode.docker-hadoop-master_default:9000
at java.base/java.net.URI$Parser.fail(URI.java:2913)
at java.base/java.net.URI$Parser.parseHostname(URI.java:3448)
at java.base/java.net.URI$Parser.parseServer(URI.java:3297)
at java.base/java.net.URI$Parser.parseAuthority(URI.java:3216)
at java.base/java.net.URI$Parser.parseHierarchical(URI.java:3158)
at java.base/java.net.URI$Parser.parse(URI.java:3114)
at java.base/java.net.URI.<init>(URI.java:685)
at org.apache.hadoop.net.NetUtils.getCanonicalUri(NetUtils.java:272)
... 24 more
2020-01-21 15:22:09,759 ERROR || WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
IP 地址: 名称节点:172.18.0.2 Kafka:172.18.0.10
我认为这可能与网络有关。
下面错误较多,希望大家帮帮我。 提前致谢!
默认情况下,Docker compose 添加一个下划线和您所在的目录 运行 命令下划线不允许出现在主机名中。 Hadoop 在 hdfs-site.xml
配置文件中默认首选主机名。
I have no idea how to convince Kafka Connect that I want a specific IP address from the namenode, he just keeps my trowing messages that found a different IP when the expected is hdfs://namenode:9000
理想情况下,您无论如何都不会使用 Docker 内的 IP,您会使用服务名称和公开的端口。
对于 HDFS 连接器,您还需要定义 1) HADOOP_CONF_DIR
env-var 2) 将您的 XML 配置挂载为远程客户端(例如 Connect)与 Hadoop 集群交互的卷和 3) 在连接器 属性 中定义 hadoop.conf.dir
。