如何使用 Azure 存储目录作为流数据源执行 PySpark Stream

How can I execute PySpark Stream with Azure Storage directory as a streaming data source

我想使用 Azure Blob 存储作为我的流的源来执行 Spark 流作业。我怎样才能使用 Python

我们可以 运行 使用 Azure 容器和 blob 服务在 Azure 批处理上激发作业。 Azure 批处理用于 运行 作业,因为它们成本低。

为此,我们需要一些必要的设置,例如存储帐户、容器注册表和 Azure 批处理 运行 个作业。

下面是 python 示例代码 运行 一个简单的 spark 作业:

import argparse  
  
from pyspark.sql import SparkSession  
  
import config  
  
  
def get_azure_spark_connection(storage_account_name, storage_account_key):  
    spark = (  
        SparkSession.builder  
            .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')  
            .config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")  
            .config("spark.hadoop.fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",  
                    storage_account_key)  
            .appName("AzureSparkDemo")  
            .getOrCreate())  
  
    (spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",  
                                                       "org.apache.hadoop.fs.azure.NativeAzureFileSystem"))  
    return spark  
  
  
if __name__ == '__main__':  
    parser = argparse.ArgumentParser()  
    parser.add_argument("-i", "--input", help="input file to parse", type=str)  
    parser.add_argument("-o", "--output", help="result file to write", type=str)  
    args = parser.parse_args()  
    spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)  
    df = (spark.read.option("header", "true")  
          .option("delimiter", ",")  
          .option("inferSchema", "true")  
          .csv(args.input))  
    df.registerTempTable("airlines")  
    result = spark.sql("""  
      select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay  
      from airlines   
      group by Year, Month, DayofMonth  
""")  
    result.repartition(1).write.mode("overwrite").parquet(args.output)

以下是使用的要求:

azure  
azure-storage  
azure-storage-blob  
pyspark==2.4.0

您可以参考这些 blogs 以了解更多关于使用 python 与 Azure 存储的 运行ning 作业。