从 foreach 内部调用时,Pyspark 保存不起作用
Pyspark saving is not working when called from inside a foreach
我正在构建一个从 Azure EventHub 接收消息并保存到数据块增量 tables 中的管道。
我对静态数据的所有测试都很顺利,请参见下面的代码:
body = 'A|B|C|D\n"False"|"253435564"|"14"|"2019-06-25 04:56:21.713"\n"True"|"253435564"|"13"|"2019-06-25 04:56:21.713"\n"
tableLocation = "/delta/tables/myTableName"
spark = SparkSession.builder.appName("CSV converter").getOrCreate()
csvData = spark.sparkContext.parallelize(body.split('\n'))
df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)
df.write.format("delta").mode("append").save(tableLocation)
但在我的例子中,每条 eventhub 消息都是一个 CSV 字符串,它们可能来自许多来源。所以每条消息必须单独处理,因为每条消息最终可能保存在不同的增量 tables.
当我尝试在 foreach 语句中执行相同的代码时,它不起作用。日志中没有显示任何错误,我找不到任何已保存的 table。
所以也许我在调用 foreach 时做错了什么。请看下面的代码:
def SaveData(row):
...
The same code above
dfEventHubCSV.rdd.foreach(SaveData)
我尝试在流式上下文中执行此操作,但不幸的是我遇到了同样的问题。
foreach 中有什么使其行为不同?
完整代码下方我是运行:
import pyspark.sql.types as t
from pyspark.sql import SQLContext
--row contains the fields Body and SdIds
--Body: CSV string
--SdIds: A string ID
def SaveData(row):
--Each row data that is going to be added to different tables
rowInfo = GetDestinationTableData(row['SdIds']).collect()
table = rowInfo[0][4]
schema = rowInfo[0][3]
database = rowInfo[0][2]
body = row['Body']
tableLocation = "/delta/" + database + '/' + schema + '/' + table
checkpointLocation = "/delta/" + database + '/' + schema + "/_checkpoints/" + table
spark = SparkSession.builder.appName("CSV").getOrCreate()
csvData = spark.sparkContext.parallelize(body.split('\n'))
df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)
df.write.format("delta").mode("append").save(tableLocation)
dfEventHubCSV.rdd.foreach(SaveData)
好吧,总而言之,一如既往,它非常简单,但我根本没看到。
基本上,当您执行 foreach 并且要保存的数据帧构建在循环内时。 worker 不像 driver,不会在保存时自动设置“/dbfs/”路径,所以如果你不手动添加“/dbfs/”,它会在 worker 本地保存数据,你永远不会找到保存的数据。
这就是我的循环不工作的原因。
我正在构建一个从 Azure EventHub 接收消息并保存到数据块增量 tables 中的管道。
我对静态数据的所有测试都很顺利,请参见下面的代码:
body = 'A|B|C|D\n"False"|"253435564"|"14"|"2019-06-25 04:56:21.713"\n"True"|"253435564"|"13"|"2019-06-25 04:56:21.713"\n"
tableLocation = "/delta/tables/myTableName"
spark = SparkSession.builder.appName("CSV converter").getOrCreate()
csvData = spark.sparkContext.parallelize(body.split('\n'))
df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)
df.write.format("delta").mode("append").save(tableLocation)
但在我的例子中,每条 eventhub 消息都是一个 CSV 字符串,它们可能来自许多来源。所以每条消息必须单独处理,因为每条消息最终可能保存在不同的增量 tables.
当我尝试在 foreach 语句中执行相同的代码时,它不起作用。日志中没有显示任何错误,我找不到任何已保存的 table。
所以也许我在调用 foreach 时做错了什么。请看下面的代码:
def SaveData(row):
...
The same code above
dfEventHubCSV.rdd.foreach(SaveData)
我尝试在流式上下文中执行此操作,但不幸的是我遇到了同样的问题。
foreach 中有什么使其行为不同?
完整代码下方我是运行:
import pyspark.sql.types as t
from pyspark.sql import SQLContext
--row contains the fields Body and SdIds
--Body: CSV string
--SdIds: A string ID
def SaveData(row):
--Each row data that is going to be added to different tables
rowInfo = GetDestinationTableData(row['SdIds']).collect()
table = rowInfo[0][4]
schema = rowInfo[0][3]
database = rowInfo[0][2]
body = row['Body']
tableLocation = "/delta/" + database + '/' + schema + '/' + table
checkpointLocation = "/delta/" + database + '/' + schema + "/_checkpoints/" + table
spark = SparkSession.builder.appName("CSV").getOrCreate()
csvData = spark.sparkContext.parallelize(body.split('\n'))
df = spark.read \
.option("header", True) \
.option("delimiter","|") \
.option("quote", "\"") \
.option("nullValue", "\N") \
.option("inferShema", "true") \
.option("mergeSchema", "true") \
.csv(csvData)
df.write.format("delta").mode("append").save(tableLocation)
dfEventHubCSV.rdd.foreach(SaveData)
好吧,总而言之,一如既往,它非常简单,但我根本没看到。
基本上,当您执行 foreach 并且要保存的数据帧构建在循环内时。 worker 不像 driver,不会在保存时自动设置“/dbfs/”路径,所以如果你不手动添加“/dbfs/”,它会在 worker 本地保存数据,你永远不会找到保存的数据。
这就是我的循环不工作的原因。