Azure HD Insight 是否支持自动加载程序检测新文件?

Does Azure HD Insight support Auto Loader for new file detection?

我指的是以下 link https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader 它使用 spark 流处理 azure databricks 中的增量文件。我想知道带有 Data lake stroage Gen2 的 HD insight 集群是否支持增量文件。我尝试了 HD insight spark cluster 中的示例我收到以下错误

示例代码:

input_df = spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format","json") \
            .option("cloudFiles.connectionString", connection_string) \
            .option("cloudFiles.resourceGroup", resource_group) \
            .option("cloudFiles.subscriptionId", subscription_id) \
            .option("cloudFiles.tenantId", tenant_id) \
            .option("cloudFiles.clientId", client_id) \
            .option("cloudFiles.clientSecret", client_secret) \
            .option("cloudFiles.includeExistingFiles", "true") \
            .schema(schema) \
            .load(input_folder) 

错误

  Traceback (most recent call last):
  File "<stdin>", line 12, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py", line 398, in load
    return self._df(self._jreader.load(path))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cloudFiles.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)

.

遗憾的是,Azure HDInsight 不支持自动加载程序检测新文件。

What is Auto Loader?

Autoloader – 来自 Databricks 的新功能允许从各种数据源增量地将数据摄取到 Delta Lake 中。 Auto Loader 是针对 Apache Spark 优化的云文件源,可在新数据到达时从云存储中持续高效地加载数据。合作伙伴集成的数据摄取网络允许您将来自数百个数据源的数据直接摄取到 Delta Lake。

Under the hood (in Azure Databricks), running Auto Loader will automatically set up an Azure Event Grid and Queue Storage services. Through these services, auto loader uses the queue from Azure Storage to easily find the new files, pass them to Spark and thus load the data with low latency and at a low cost within your streaming or batch jobs. The Auto Loader logs which files were processed which guarantees an exactly once processing of the incoming data.

Auto Loader 在新数据文件到达云存储时以递增方式高效地处理它们,无需任何额外设置。 Auto Loader 提供了一个名为 cloudFiles 的新结构化流媒体源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理它们,并可选择处理该目录中的现有文件。

详情见Load files from Azure Blob storage or Azure Data Lake Storage Gen2 using Auto Loader