从 Azure EventHubs Capture 生成的 Azure Data Lake Gen1 使用 Databricks 读取 avro 数据失败

Reading avro data with Databricks from Azure Data Lake Gen1 generated by Azure EventHubs Capture fails

我正在尝试从 Azure Data Lake Gen1 读取 avro 数据,这些数据是从 Azure EventHubs 生成的,在 Azure Databricks 中使用 pyspark 启用了 Azure Event Hubs Capture:

inputdata = "evenhubscapturepath/*/*"
rawData = spark.read.format("avro").load(inputdata)

以下语句失败

rawData.count()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 162 in stage 48.0 failed 4 times, most recent failure: Lost task 162.3 in stage 48.0 (TID 2807, 10.3.2.4, executor 1): java.io.IOException: Not an Avro data file

EventHub-Capture 是否正在写入非 Avro 数据?使用 Spark 读取 EventHub 捕获的数据是否有任何最佳实践?

确保输入数据是“.avro”文件。

由于 spark-avro 模块是外部的,所以 DataFrameReader 或 DataFrameWriter 中没有 .avro API。

要load/saveAvro格式的数据,需要指定数据源选项格式为avro(或org.apache.spark.sql.avro)。

示例:

Python
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")

#storage->avro
avroDf = spark.read.format("com.databricks.spark.avro").load(in_path)

更多详情,请参考以下链接:

https://spark.apache.org/docs/latest/sql-data-sources-avro.html

http://blog.itaysk.com/2017/01/14/processing-event-hub-capture-files-using-spark

https://medium.com/@caiomsouza/processing-event-hubs-capture-files-avro-format-using-spark-azure-databricks-save-to-parquet-95259001d85f

希望这对您有所帮助。

一种实现冷摄取路径的模式是使用 Event Hubs Capture. EventHubs capturing writes one file per partition as defined with the windowing parameters。数据以 avro 格式编写,可以使用 Apache Spark 进行分析。

那么使用此功能的最佳做法是什么?

1.不要过度分区

我经常看到人们使用默认配置,最终往往会产生许多小文件。如果您想使用通过 EventHubs Capture with Spark 摄取的数据,请记住 file sizes in Azure Data Lake Store and partitions with Spark 的最佳实践。文件大小应为 ~256 MB,分区应在 10 到 50 GB 之间。所以最后配置取决于您正在使用的消息的数量和大小。在大多数情况下,您只需按摄取日期对数据进行分区就可以了。

2。检查 "Do not emit empty files option"

你应该检查 "Do not emit empty files option"。如果要使用 Spark 消费数据,可以节省不必要的文件操作。

3。在文件路径中使用数据来源

使用流式架构,您的 EventHub 就是 Landing Zone 在面向批处理的架构方法中的样子。因此,您将摄取原始数据层中的数据。好的做法是在目录路径中使用数据源而不是 EventHub 的名称。因此,例如,如果您正在从工厂中的机器人获取遥测数据,这可能是目录路径 /raw/robots/

存储命名需要使用所有属性,如 {Namesapce}、{PartitionId}。所以最后一个好的捕获文件格式定义具有明确定义的路径、每日分区和使用 Azure Data Lake Gen 2 中文件名的剩余属性可能如下所示:

 /raw/robots/ingest_date={Year}-{Month}-{Day}/{Hour}{Minute}{Second}-{Namespace}-{EventHub}-{PartitionId}

4。想想压缩作业

捕获的数据未压缩,在您的用例中也可能最终变成小文件(因为最低写入频率为 15 分钟)。因此,如果有必要,每天写一次压缩作业 运行 。像

df.repartition(5).write.format("avro").save(targetpath)

会做这份工作。

那么现在读取捕获数据的最佳实践是什么?

5.忽略读取数据的非 avro 文件

Azure EventHubs Capture 将临时数据写入 Azure Data Lake Gen1。最佳实践是只读取带有 avro-extension 的数据。您可以通过 spark 配置轻松实现此目的:

spark.conf.set("avro.mapred.ignore.inputs.without.extension", "true")

6.只读相关分区

考虑只读取相关分区,例如。 G。过滤当前摄取日期。

7.使用共享元数据

读取捕获的数据与直接从 Azure EventHubs 读取数据的工作方式类似。 所以你必须有一个模式。假设您还有作业直接使用 Spark Structured Streaming 读取数据,一个好的模式是存储元数据并共享它。您可以将此元数据存储在 Data Lake Store json 文件中:

[{"MeasurementTS":"timestamp","Location":"string", "Temperature":"double"}]

并用 :

阅读
# parse the metadata to get the schema
from collections import OrderedDict 
from pyspark.sql.types import *
import json

ds = dbutils.fs.head (metadata)                                                 # read metadata file

items = (json
  .JSONDecoder(object_pairs_hook=OrderedDict)
  .decode(ds)[0].items())

#Schema mapping 
mapping = {"string": StringType, "integer": IntegerType, "double" : DoubleType, "timestamp" : TimestampType, "boolean" : BooleanType}

schema = StructType([
    StructField(k, mapping.get(v.lower())(), True) for (k, v) in items])

因此您可以重复使用您的模式:

from pyspark.sql.functions import *

parsedData = spark.read.format("avro").load(rawpath). \
  selectExpr("EnqueuedTimeUtc", "cast(Body as string) as json") \
 .select("EnqueuedTimeUtc", from_json("json", schema=Schema).alias("data")) \
 .select("EnqueuedTimeUtc", "data.*")