如何使用结构化流优化 Kafka 主题的分区策略?
How to optimize partition strategy of Kafka topic for consumption with Structured Streaming?
我是 kafka 的新手,正在尝试将数据写入主题并从同一主题读取(我们现在作为源团队来摄取数据。因此我们正在执行写入 Kafk 主题的两个操作为同一主题消费)。
我在 spark-shell 上写了下面的代码来将数据写入 Kafka 主题。
pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
tn = "topic_name"
kafka_broker = "brokerurl:9500"
endpoint_uri = "endpoint_uri"
client_id = "clientid"
client_secret = "secret_key"
jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
oauth_client = " oauth.client.id='{0}'".format(client_id)
oauth_secret = " oauth.client.secret='{0}'".format(client_secret)
oauth_token_endpoint_uri = " oauth.token.endpoint.uri='{0}'".format(endpoint_uri)
oauth_config = jaas_config + oauth_client + oauth_secret + oauth_token_endpoint_uri + " oauth.max.token.expiry.seconds='30000' ;"
df = spark.sql("select * from dbname.tablename where geography in ('ASIA', 'LATIN_AMERICA') and geo_year in (2020, 2021)").select(F.to_json(F.struct(F.col("*"))).alias("value"))
# WRITE TO TOPIC
df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", tn)\
.save()
后来我才知道,一个Kafka主题可以包含分区中的数据。所以我删除并重新创建了相同的主题,但这次有 3 个分区。
我所有的 spark 经验都在批处理中,分区的概念甚至存在于我们在读取 table 或使用
的文件时对数据进行分区的地方
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name')
.load()
批处理中使用的这个分区列通常是一个具有高基数的列&我们还可以指定我们想要将数据拆分成的分区数
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name').option('numPartitions', INTEGER_VALUE_OF_PARTITIONS)
.load()
我在纯 Kafka 代码中看到了自定义分区程序 类,但我正在使用 spark-streaming,甚至不确定如何集成它。
我对 Kafka 主题分区的困惑在于以下几点:
- 如何选择每个主题的分区数?我是
使用 Kafka 实现 Spark Streaming。
- 有没有一种方法可以使用 Spark 流管理分区数据
?
- 如果没有,有没有办法确保数据在
主题的分区。
我已经阅读了 this 官方文档。
但是在那里找不到任何关于分区策略的信息。
任何人都可以让我清楚地了解如何将数据写入主题的特定分区,还是将其留给 Kafka 更好。
编辑 1:
我刚刚经历了这个 link 并且提到了一个公式来根据吞吐量计算所需的分区数。
这是我们可以用来确定每个主题的分区数的方法吗?
任何澄清对我来说都很有价值。
这是一个相当广泛的话题,其中的问题需要一些彻底的答案。无论如何,最重要的是:
- 通常,Kafka 会随着主题中分区的数量进行扩展
- Spark 随着工作节点数量和可用数量的增加而扩展 cores/slots
- Kafka 主题的每个分区只能由单个 Spark 任务使用(parallelsim 然后取决于 Spark wcores 的数量)
- 如果你有多个 Spark worker 但只有一个 Kafka 主题分区,则只有一个核心可以消费数据
- 同样,如果您有多个 Kafka 主题分区但只有一个具有单核的工作节点,则“并行度”为 1
- 请记住,公式通常代表一种理论,为了简单起见,省略了细节。您引用的公式是一个很好的起点,但最终它取决于您的环境,例如:对延迟或吞吐量的要求、网络 bandwith/traffic、可用硬件、成本等。
也就是说,只有您可以进行优化测试。
附带说明一下,当从 Spark Structured Streaming 写入 Kafka 时,如果您的 Dataframe 包含“分区”列,它将用于将记录发送到相应的分区(从 0 开始)。您还可以在数据框中包含“主题”列,这样您就可以将记录发送到某个主题。
Spark Structured Streaming 会将每条记录单独发送到 Kafka。
我是 kafka 的新手,正在尝试将数据写入主题并从同一主题读取(我们现在作为源团队来摄取数据。因此我们正在执行写入 Kafk 主题的两个操作为同一主题消费)。 我在 spark-shell 上写了下面的代码来将数据写入 Kafka 主题。
pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
tn = "topic_name"
kafka_broker = "brokerurl:9500"
endpoint_uri = "endpoint_uri"
client_id = "clientid"
client_secret = "secret_key"
jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
oauth_client = " oauth.client.id='{0}'".format(client_id)
oauth_secret = " oauth.client.secret='{0}'".format(client_secret)
oauth_token_endpoint_uri = " oauth.token.endpoint.uri='{0}'".format(endpoint_uri)
oauth_config = jaas_config + oauth_client + oauth_secret + oauth_token_endpoint_uri + " oauth.max.token.expiry.seconds='30000' ;"
df = spark.sql("select * from dbname.tablename where geography in ('ASIA', 'LATIN_AMERICA') and geo_year in (2020, 2021)").select(F.to_json(F.struct(F.col("*"))).alias("value"))
# WRITE TO TOPIC
df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", tn)\
.save()
后来我才知道,一个Kafka主题可以包含分区中的数据。所以我删除并重新创建了相同的主题,但这次有 3 个分区。
我所有的 spark 经验都在批处理中,分区的概念甚至存在于我们在读取 table 或使用
的文件时对数据进行分区的地方df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name')
.load()
批处理中使用的这个分区列通常是一个具有高基数的列&我们还可以指定我们想要将数据拆分成的分区数
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name').option('numPartitions', INTEGER_VALUE_OF_PARTITIONS)
.load()
我在纯 Kafka 代码中看到了自定义分区程序 类,但我正在使用 spark-streaming,甚至不确定如何集成它。 我对 Kafka 主题分区的困惑在于以下几点:
- 如何选择每个主题的分区数?我是 使用 Kafka 实现 Spark Streaming。
- 有没有一种方法可以使用 Spark 流管理分区数据 ?
- 如果没有,有没有办法确保数据在 主题的分区。
我已经阅读了 this 官方文档。
但是在那里找不到任何关于分区策略的信息。 任何人都可以让我清楚地了解如何将数据写入主题的特定分区,还是将其留给 Kafka 更好。
编辑 1: 我刚刚经历了这个 link 并且提到了一个公式来根据吞吐量计算所需的分区数。 这是我们可以用来确定每个主题的分区数的方法吗?
任何澄清对我来说都很有价值。
这是一个相当广泛的话题,其中的问题需要一些彻底的答案。无论如何,最重要的是:
- 通常,Kafka 会随着主题中分区的数量进行扩展
- Spark 随着工作节点数量和可用数量的增加而扩展 cores/slots
- Kafka 主题的每个分区只能由单个 Spark 任务使用(parallelsim 然后取决于 Spark wcores 的数量)
- 如果你有多个 Spark worker 但只有一个 Kafka 主题分区,则只有一个核心可以消费数据
- 同样,如果您有多个 Kafka 主题分区但只有一个具有单核的工作节点,则“并行度”为 1
- 请记住,公式通常代表一种理论,为了简单起见,省略了细节。您引用的公式是一个很好的起点,但最终它取决于您的环境,例如:对延迟或吞吐量的要求、网络 bandwith/traffic、可用硬件、成本等。 也就是说,只有您可以进行优化测试。
附带说明一下,当从 Spark Structured Streaming 写入 Kafka 时,如果您的 Dataframe 包含“分区”列,它将用于将记录发送到相应的分区(从 0 开始)。您还可以在数据框中包含“主题”列,这样您就可以将记录发送到某个主题。
Spark Structured Streaming 会将每条记录单独发送到 Kafka。