使用 sparklyr 添加额外的 kafka 消费者设置
Add additional kafka consumer settings with sparklyr
我正在尝试使用 sparklyr 连接到安全的 Kafka 服务器。然而,要访问它,您需要指定正确的安全设置(协议、密码等)。但是当在 read_options 中指定时,它们不会传递给消费者配置。这里的 R 代码:
library(sparklyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
sc <- spark_connect(master = "local",config=config, version="2.4.0")
read_options <- list(
kafka.bootstrap.servers='test.server',
group.id="name",
security.protocol='SSL',
ssl.key.password="password",
ssl.keystore.location="C:/Users/...",
ssl.keystore.password="password",
ssl.truststore.location="C:/Users/...",
ssl.truststore.password="password",
subscribe = "topic")
stream <- stream_read_kafka(sc, options = read_options)
如果我们看一下 spark 的日志,消费者配置中只列出了服务器:(简化版)
INFO ConsumerConfig: ConsumerConfig values:
bootstrap.servers = [test.server]
....
group.id = spark-kafka-source-7bb43fe7-56b2-4e19-9162-371e4db2075a-1047255113-driver-2
....
security.protocol = PLAINTEXT
...
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
..
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
有没有possibility/workaround给消费者添加必要的设置?
更新
见用户1278798的回答
对于遇到同样问题的人,重要的是要补充一点,spark 不支持所有设置(例如 group.id 或 auto.offset.reset)。看看user1278798给的link就知道了。
中清楚解释的那样
Kafka’s own configurations can be set via DataStreamReader.option
with kafka.
prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port")
.
您的选项缺少前缀。
我正在尝试使用 sparklyr 连接到安全的 Kafka 服务器。然而,要访问它,您需要指定正确的安全设置(协议、密码等)。但是当在 read_options 中指定时,它们不会传递给消费者配置。这里的 R 代码:
library(sparklyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
sc <- spark_connect(master = "local",config=config, version="2.4.0")
read_options <- list(
kafka.bootstrap.servers='test.server',
group.id="name",
security.protocol='SSL',
ssl.key.password="password",
ssl.keystore.location="C:/Users/...",
ssl.keystore.password="password",
ssl.truststore.location="C:/Users/...",
ssl.truststore.password="password",
subscribe = "topic")
stream <- stream_read_kafka(sc, options = read_options)
如果我们看一下 spark 的日志,消费者配置中只列出了服务器:(简化版)
INFO ConsumerConfig: ConsumerConfig values:
bootstrap.servers = [test.server]
....
group.id = spark-kafka-source-7bb43fe7-56b2-4e19-9162-371e4db2075a-1047255113-driver-2
....
security.protocol = PLAINTEXT
...
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
..
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
有没有possibility/workaround给消费者添加必要的设置?
更新
见用户1278798的回答
对于遇到同样问题的人,重要的是要补充一点,spark 不支持所有设置(例如 group.id 或 auto.offset.reset)。看看user1278798给的link就知道了。
Kafka’s own configurations can be set via
DataStreamReader.option
withkafka.
prefix, e.g,stream.option("kafka.bootstrap.servers", "host:port")
.
您的选项缺少前缀。