如何使用 Azure 存储目录作为流数据源执行 PySpark Stream
How can I execute PySpark Stream with Azure Storage directory as a streaming data source
我想使用 Azure Blob 存储作为我的流的源来执行 Spark 流作业。我怎样才能使用 Python
我们可以 运行 使用 Azure 容器和 blob 服务在 Azure 批处理上激发作业。 Azure 批处理用于 运行 作业,因为它们成本低。
为此,我们需要一些必要的设置,例如存储帐户、容器注册表和 Azure 批处理 运行 个作业。
下面是 python 示例代码 运行 一个简单的 spark 作业:
import argparse
from pyspark.sql import SparkSession
import config
def get_azure_spark_connection(storage_account_name, storage_account_key):
spark = (
SparkSession.builder
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')
.config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("spark.hadoop.fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",
storage_account_key)
.appName("AzureSparkDemo")
.getOrCreate())
(spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem"))
return spark
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", help="input file to parse", type=str)
parser.add_argument("-o", "--output", help="result file to write", type=str)
args = parser.parse_args()
spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)
df = (spark.read.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv(args.input))
df.registerTempTable("airlines")
result = spark.sql("""
select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay
from airlines
group by Year, Month, DayofMonth
""")
result.repartition(1).write.mode("overwrite").parquet(args.output)
以下是使用的要求:
azure
azure-storage
azure-storage-blob
pyspark==2.4.0
您可以参考这些 blogs 以了解更多关于使用 python 与 Azure 存储的 运行ning 作业。
我想使用 Azure Blob 存储作为我的流的源来执行 Spark 流作业。我怎样才能使用 Python
我们可以 运行 使用 Azure 容器和 blob 服务在 Azure 批处理上激发作业。 Azure 批处理用于 运行 作业,因为它们成本低。
为此,我们需要一些必要的设置,例如存储帐户、容器注册表和 Azure 批处理 运行 个作业。
下面是 python 示例代码 运行 一个简单的 spark 作业:
import argparse
from pyspark.sql import SparkSession
import config
def get_azure_spark_connection(storage_account_name, storage_account_key):
spark = (
SparkSession.builder
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:2.7.3')
.config('spark.hadoop.fs.azure', "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("spark.hadoop.fs.azure.account.key." + storage_account_name + ".blob.core.windows.net",
storage_account_key)
.appName("AzureSparkDemo")
.getOrCreate())
(spark.sparkContext._jsc.hadoopConfiguration().set("fs.wasbs.impl",
"org.apache.hadoop.fs.azure.NativeAzureFileSystem"))
return spark
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", help="input file to parse", type=str)
parser.add_argument("-o", "--output", help="result file to write", type=str)
args = parser.parse_args()
spark = get_azure_spark_connection(config.STORAGE_ACCOUNT_NAME, config.STORAGE_ACCOUNT_KEY)
df = (spark.read.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv(args.input))
df.registerTempTable("airlines")
result = spark.sql("""
select Year, Month, DayofMonth, _avg_(ArrDelay) as avg_ArrDelay, _avg_(DepDelay) as avg_DepDelay
from airlines
group by Year, Month, DayofMonth
""")
result.repartition(1).write.mode("overwrite").parquet(args.output)
以下是使用的要求:
azure
azure-storage
azure-storage-blob
pyspark==2.4.0
您可以参考这些 blogs 以了解更多关于使用 python 与 Azure 存储的 运行ning 作业。