Event Hub Throttling with the error: request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102

Event Hub Throttling with the error: request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102

我正在使用 Databricks Labs 数据生成器将合成数据发送到事件中心。

大约两分钟内一切似乎都正常,但随后流式传输停止并出现以下错误:

The request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102.

谁能告诉我如何调整节流。

我用来向事件中心发送数据的代码如下:

delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]


flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
                 #.withColumn("body",StringType(), False)
                 .withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
                 .withColumn("airline", "string", minValue=1, maxValue=500,  prefix="airline", random=True, distribution="normal")
                 .withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
                 .withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
                 .withColumn("delayed_departure",  "timestamp", expr="cast(original_departure as bigint) +  (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
                 .withColumn("reason", "string", values=delay_reasons, random=True)
                )

df_flight_data = flightdata_defn.build(withStreaming=True, options={'rowsPerSecond': 100})


streamingDelays = (
  df_flight_data
    .groupBy(
      #df_flight_data.body,
      df_flight_data.flightNumber,
      df_flight_data.airline,
      df_flight_data.original_departure,
      df_flight_data.delay_minutes,
      df_flight_data.delayed_departure,
      df_flight_data.reason,
      window(df_flight_data.original_departure, "1 hour")
    )
    .count()
)

writeConnectionString = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
checkpointLocation = "///checkpoint"

# ehWriteConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
# ehWriteConf = {
#   'eventhubs.connectionString' : writeConnectionString
# }

ehWriteConf = {
  'eventhubs.connectionString' : writeConnectionString
}

将主体数据从 DataFrame 写入 EventHub。事件使用循环模型跨分区分布。

ds = streamingDelays \
  .select(F.to_json(F.struct("*")).alias("body")) \
  .writeStream.format("eventhubs") \
  .options(**ehWriteConf) \
  .outputMode("complete") \
  .option("checkpointLocation", "...") \
  .start()

我忘了说我有 1 个 TU

这是由于事件中心的常规流量限制,请查看 1 个 TU https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas 的限制,您可以将 TU 的数量增加到 2,然后从那里开始。 如果您认为这是意外的节流,请为该问题提交支持票。