尝试 运行 Pulsar 源连接器时出现 TimeoutException

TimeoutException when trying to run a Pulsar source connector

我正在尝试 运行 Pulsar DebeziumPostgresSource 连接器。

这是我运行宁的命令:

bin/pulsar-admin \
  --admin-url https://localhost:8443 \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params file:///pulsar/tokens/broker/token \
  --tls-allow-insecure \
  source localrun \
    --broker-service-url pulsar+ssl://my-pulsar-server:6651 \
    --client-auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
    --client-auth-params file:///pulsar/tokens/broker/token \
    --tls-allow-insecure \
    --source-config-file /pulsar/debezium-config/my-source-config.yaml

这是 /pulsar/debezium-config/my-source-config.yaml 文件:

tenant: my-tenant
namespace: my-namespace
name: my-source
topicName: my-topic
archive: connectors/pulsar-io-debezium-postgres-2.6.0-SNAPSHOT.nar
parallelism: 1

configs:
  plugin.name: pgoutput
  database.hostname: my-db-server
  database.port: "5432"
  database.user: my-db-user
  database.password: my-db-password
  database.dbname: my-db
  database.server.name: my-db-server-name
  table.whitelist: my_schema.my_table
  pulsar.service.url: pulsar+ssl://my-pulsar-server:6651/

这是上面命令的输出:

11:47:29.924 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - my-tenant/my-namespace/my-source-0 RuntimeSpawner starting function
11:47:29.925 [main] INFO  org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=4073a1d9-1312-4570-981b-6723626e394a, functionVersion=01d5a3a7-c6d7-4f79-8717-403ad1371411, functionDetails=tenant: "my-tenant"
namespace: "my-namespace"
name: "my-source"
className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
autoAck: true
parallelism: 1
source {
  className: "org.apache.pulsar.io.debezium.postgres.DebeziumPostgresSource"
  configs: "{\"database.user\":\"my-db-user\",\"database.dbname\":\"my-db\",\"database.hostname\":\"my-db-server\",\"database.password\":\"my-db-password\",\"database.server.name\":\"my-db-server-name\",\"plugin.name\":\"pgoutput\",\"database.port\":\"5432\",\"pulsar.service.url\":\"pulsar+ssl://my-pulsar-server:6651/\",\"table.whitelist\":\"my_schema.my_table\"}"
  typeClassName: "org.apache.pulsar.common.schema.KeyValue"
}
sink {
  topic: "my-topic"
  typeClassName: "org.apache.pulsar.common.schema.KeyValue"
}
resources {
  cpu: 1.0
  ram: 1073741824
  disk: 10737418240
}
componentType: SOURCE
, maxBufferedTuples=1024, functionAuthenticationSpec=null, port=39135, clusterName=local, maxPendingAsyncRequests=1000)
11:47:32.552 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xf8ffbf24, L:/redacted-ip-l:43802 - R:my-pulsar-server/redacted-ip-r:6651]] Connected to server
11:47:33.240 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
  "topicName" : "my-topic",
  "producerName" : null,
  "sendTimeoutMs" : 0,
  "blockIfQueueFull" : true,
  "maxPendingMessages" : 1000,
  "maxPendingMessagesAcrossPartitions" : 50000,
  "messageRoutingMode" : "CustomPartition",
  "hashingScheme" : "Murmur3_32Hash",
  "cryptoFailureAction" : "FAIL",
  "batchingMaxPublishDelayMicros" : 10000,
  "batchingPartitionSwitchFrequencyByPublishDelay" : 10,
  "batchingMaxMessages" : 1000,
  "batchingMaxBytes" : 131072,
  "batchingEnabled" : true,
  "chunkingEnabled" : false,
  "compressionType" : "LZ4",
  "initialSequenceId" : null,
  "autoUpdatePartitions" : true,
  "multiSchema" : true,
  "properties" : {
    "application" : "pulsar-source",
    "id" : "my-tenant/my-namespace/my-source",
    "instance_id" : "0"
  }
}
11:47:33.259 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar+ssl://my-pulsar-server:6651",
  "authPluginClassName" : "org.apache.pulsar.client.impl.auth.AuthenticationToken",
  "authParams" : "file:///pulsar/tokens/broker/token",
  "authParamMap" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : true,
  "tlsTrustCertsFilePath" : null,
  "tlsAllowInsecureConnection" : true,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : null,
  "tlsTrustStorePassword" : null,
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "proxyServiceUrl" : null,
  "proxyProtocol" : null
}
11:47:33.418 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xab39f703, L:/redacted-ip-l:43806 - R:my-pulsar-server/redacted-ip-r:6651]] Connected to server
11:47:33.422 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xab39f703, L:/redacted-ip-l:43806 - R:my-pulsar-server/redacted-ip-r:6651] Connected through proxy to target broker at my-broker:6651
11:47:33.484 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0xab39f703, L:/redacted-ip-l:43806 - R:my-pulsar-server/redacted-ip-r:6651]
11:48:33.434 [pulsar-client-io-1-1] ERROR org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Failed to create producer: 3 lookup request timedout after ms 30000
11:48:33.438 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xab39f703, L:/redacted-ip-l:43806 - R:my-pulsar-server/redacted-ip-r:6651] request 3 timed out after 30000 ms
11:48:33.629 [main] INFO  org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of
java.lang.RuntimeException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 3 lookup request timedout after ms 30000
    at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.<init>(PulsarSink.java:177) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtLeastOnceProcessor.<init>(PulsarSink.java:206) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:284) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:819) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:224) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 3 lookup request timedout after ms 30000
    at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:821) ~[org.apache.pulsar-pulsar-client-api-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:106) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.<init>(PulsarSink.java:174) ~[org.apache.pulsar-pulsar-functions-instance-2.6.0-SNAPSHOT.jar:2.6.0-SNAPSHOT]
    ... 6 more
11:48:59.956 [function-timer-thread-5-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - my-tenant/my-namespace/my-source-java.lang.RuntimeException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 3 lookup request timedout after ms 30000 Function Container is dead with exception.. restarting

如您所见,由于TimeoutException,创建生产者失败。此错误的可能原因是什么?进一步调查此问题的最佳方法是什么?

附加信息:

我也尝试了 --tls-trust-cert-path /my/ca-certificates.crt 选项而不是 --tls-allow-insecure,但得到了同样的错误。

我可以列出租户:

bin/pulsar-admin \
  --admin-url https://localhost:8443 \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params file:///pulsar/tokens/broker/token \
  tenants list
# Output:
# "public"
# "pulsar"
# "my-topic"

但我无法获得 OK 代理健康检查:

bin/pulsar-admin \
  --admin-url https://localhost:8443 \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params file:///pulsar/tokens/broker/token \
  brokers healthcheck
# Output:
# null
# Reason: java.util.concurrent.TimeoutException

bin/pulsar-admin \
  --admin-url https://localhost:8443 \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params file:///pulsar/tokens/broker/token \
  --tls-allow-insecure \
  brokers healthcheck
# Output:
# HTTP 500 Internal Server Error
# Reason: HTTP 500 Internal Server Error

就我而言,根本原因是 TLS 证书过期。