使用 sparklyr 添加额外的 kafka 消费者设置

Add additional kafka consumer settings with sparklyr

我正在尝试使用 sparklyr 连接到安全的 Kafka 服务器。然而,要访问它,您需要指定正确的安全设置(协议、密码等)。但是当在 read_options 中指定时,它们不会传递给消费者配置。这里的 R 代码:

library(sparklyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
sc <- spark_connect(master = "local",config=config, version="2.4.0")

read_options <- list(
  kafka.bootstrap.servers='test.server',
  group.id="name",
  security.protocol='SSL',
  ssl.key.password="password",
  ssl.keystore.location="C:/Users/...",
  ssl.keystore.password="password",
  ssl.truststore.location="C:/Users/...",
  ssl.truststore.password="password",
  subscribe = "topic")

stream <- stream_read_kafka(sc, options = read_options) 

如果我们看一下 spark 的日志,消费者配置中只列出了服务器:(简化版)

INFO ConsumerConfig: ConsumerConfig values: 
    bootstrap.servers = [test.server]
    ....
    group.id = spark-kafka-source-7bb43fe7-56b2-4e19-9162-371e4db2075a-1047255113-driver-2
    ....
    security.protocol = PLAINTEXT
    ...
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ..
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

有没有possibility/workaround给消费者添加必要的设置?

更新

见用户1278798的回答

对于遇到同样问题的人,重要的是要补充一点,spark 不支持所有设置(例如 group.id 或 auto.offset.reset)。看看user1278798给的link就知道了。

正如 the official documentation

中清楚解释的那样

Kafka’s own configurations can be set via DataStreamReader.option with kafka. prefix, e.g, stream.option("kafka.bootstrap.servers", "host:port").

您的选项缺少前缀。