Event Hub Throttling with the error: request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102
Event Hub Throttling with the error: request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102
我正在使用 Databricks Labs 数据生成器将合成数据发送到事件中心。
大约两分钟内一切似乎都正常,但随后流式传输停止并出现以下错误:
The request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102.
谁能告诉我如何调整节流。
我用来向事件中心发送数据的代码如下:
delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]
flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
#.withColumn("body",StringType(), False)
.withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
.withColumn("airline", "string", minValue=1, maxValue=500, prefix="airline", random=True, distribution="normal")
.withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
.withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
.withColumn("delayed_departure", "timestamp", expr="cast(original_departure as bigint) + (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
.withColumn("reason", "string", values=delay_reasons, random=True)
)
df_flight_data = flightdata_defn.build(withStreaming=True, options={'rowsPerSecond': 100})
streamingDelays = (
df_flight_data
.groupBy(
#df_flight_data.body,
df_flight_data.flightNumber,
df_flight_data.airline,
df_flight_data.original_departure,
df_flight_data.delay_minutes,
df_flight_data.delayed_departure,
df_flight_data.reason,
window(df_flight_data.original_departure, "1 hour")
)
.count()
)
writeConnectionString = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
checkpointLocation = "///checkpoint"
# ehWriteConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
# ehWriteConf = {
# 'eventhubs.connectionString' : writeConnectionString
# }
ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}
将主体数据从 DataFrame 写入 EventHub。事件使用循环模型跨分区分布。
ds = streamingDelays \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream.format("eventhubs") \
.options(**ehWriteConf) \
.outputMode("complete") \
.option("checkpointLocation", "...") \
.start()
我忘了说我有 1 个 TU
这是由于事件中心的常规流量限制,请查看 1 个 TU https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas 的限制,您可以将 TU 的数量增加到 2,然后从那里开始。
如果您认为这是意外的节流,请为该问题提交支持票。
我正在使用 Databricks Labs 数据生成器将合成数据发送到事件中心。
大约两分钟内一切似乎都正常,但随后流式传输停止并出现以下错误:
The request was terminated because the entity is being throttled. Error code : 50002. Sub error : 102.
谁能告诉我如何调整节流。
我用来向事件中心发送数据的代码如下:
delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]
flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
#.withColumn("body",StringType(), False)
.withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
.withColumn("airline", "string", minValue=1, maxValue=500, prefix="airline", random=True, distribution="normal")
.withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
.withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
.withColumn("delayed_departure", "timestamp", expr="cast(original_departure as bigint) + (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
.withColumn("reason", "string", values=delay_reasons, random=True)
)
df_flight_data = flightdata_defn.build(withStreaming=True, options={'rowsPerSecond': 100})
streamingDelays = (
df_flight_data
.groupBy(
#df_flight_data.body,
df_flight_data.flightNumber,
df_flight_data.airline,
df_flight_data.original_departure,
df_flight_data.delay_minutes,
df_flight_data.delayed_departure,
df_flight_data.reason,
window(df_flight_data.original_departure, "1 hour")
)
.count()
)
writeConnectionString = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
checkpointLocation = "///checkpoint"
# ehWriteConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
# ehWriteConf = {
# 'eventhubs.connectionString' : writeConnectionString
# }
ehWriteConf = {
'eventhubs.connectionString' : writeConnectionString
}
将主体数据从 DataFrame 写入 EventHub。事件使用循环模型跨分区分布。
ds = streamingDelays \
.select(F.to_json(F.struct("*")).alias("body")) \
.writeStream.format("eventhubs") \
.options(**ehWriteConf) \
.outputMode("complete") \
.option("checkpointLocation", "...") \
.start()
我忘了说我有 1 个 TU
这是由于事件中心的常规流量限制,请查看 1 个 TU https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas 的限制,您可以将 TU 的数量增加到 2,然后从那里开始。 如果您认为这是意外的节流,请为该问题提交支持票。