我如何遍历代码存储库中的 json 个文件并逐步附加到数据集
How can i iterate over json files in code repositories and incrementally append to a dataset
我已经通过数据连接将包含 100,000 个原始 json 文件的数据集导入到 foundry 中,大约 100gb。我想使用 Python Transforms raw file access
转换来读取文件,将结构数组和结构扁平化为数据帧,作为对 df 的增量更新。
我想使用 *.json 文件文档中的以下示例,并将其转换为使用 @incremental()
装饰器的增量更新。
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
在@Jeremy David Gamet 的帮助下,我能够开发代码来获取我想要的数据集。
from transforms.api import transform, Input, Output
from pyspark import *
import json
@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)
编码为flatten__df
以上代码适用于少数文件,因为文件超过 100,0000 个,我遇到了以下错误:
Connection To Driver Lost
This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.
有什么办法解决这个问题吗?
我已经给出了如何动态完成此操作的示例作为对另一个问题的回答。
这是该代码的 link 答案:How to union multiple dynamic inputs in Palantir Foundry? 和相同代码的副本:
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging
def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}
for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),
inpt=Input(" path to input here ".format(val=value)),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)
logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date', F.current_date())
df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms
TRANSFORMS = transform_generator()
如果有什么我可以澄清的,请告诉我。
我已经通过数据连接将包含 100,000 个原始 json 文件的数据集导入到 foundry 中,大约 100gb。我想使用 Python Transforms raw file access
转换来读取文件,将结构数组和结构扁平化为数据帧,作为对 df 的增量更新。
我想使用 *.json 文件文档中的以下示例,并将其转换为使用 @incremental()
装饰器的增量更新。
>>> import csv
>>> from pyspark.sql import Row
>>> from transforms.api import transform, Input, Output
>>>
>>> @transform(
... processed=Output('/examples/hair_eye_color_processed'),
... hair_eye_color=Input('/examples/students_hair_eye_color_csv'),
... )
... def example_computation(hair_eye_color, processed):
...
... def process_file(file_status):
... with hair_eye_color.filesystem().open(file_status.path) as f:
... r = csv.reader(f)
...
... # Construct a pyspark.Row from our header row
... header = next(r)
... MyRow = Row(*header)
...
... for row in csv.reader(f):
... yield MyRow(*row)
...
... files_df = hair_eye_color.filesystem().files('**/*.csv')
... processed_df = files_df.rdd.flatMap(process_file).toDF()
... processed.write_dataframe(processed_df)
在@Jeremy David Gamet 的帮助下,我能够开发代码来获取我想要的数据集。
from transforms.api import transform, Input, Output
from pyspark import *
import json
@transform(
out=Output('foundry/outputdataset'),
inpt=Input('foundry/inputdataset'),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path,'r', encoding='utf-8-sig') as fi:
data = json.load(fi)
file_dates.append(data)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2.drop_duplicates()
# this code to [Flatten array column][1]
df_2 = flatten(df_2)
out.write_dataframe(df_2)
编码为flatten__df
以上代码适用于少数文件,因为文件超过 100,0000 个,我遇到了以下错误:
Connection To Driver Lost
This error indicates that connection to the driver was lost unexpectedly, which is often caused by the driver being terminated due to running out of memory. Common reasons for driver out-of-memory (OOM) errors include functions that materialize data to the driver such as .collect(), broadcasted joins, and using Pandas dataframes.
有什么办法解决这个问题吗?
我已经给出了如何动态完成此操作的示例作为对另一个问题的回答。
这是该代码的 link 答案:How to union multiple dynamic inputs in Palantir Foundry? 和相同代码的副本:
from transforms.api import Input, Output, transform
from pyspark.sql import functions as F
import json
import logging
def transform_generator():
transforms = []
transf_dict = {## enter your dynamic mappings here ##}
for value in transf_dict:
@transform(
out=Output(' path to your output here '.format(val=value)),
inpt=Input(" path to input here ".format(val=value)),
)
def update_set(ctx, inpt, out):
spark = ctx.spark_session
sc = spark.sparkContext
filesystem = list(inpt.filesystem().ls())
file_dates = []
for files in filesystem:
with inpt.filesystem().open(files.path) as fi:
data = json.load(fi)
file_dates.append(data)
logging.info('info logs:')
logging.info(file_dates)
json_object = json.dumps(file_dates)
df_2 = spark.read.option("multiline", "true").json(sc.parallelize([json_object]))
df_2 = df_2.withColumn('upload_date', F.current_date())
df_2.drop_duplicates()
out.write_dataframe(df_2)
transforms.append(update_logs)
return transforms
TRANSFORMS = transform_generator()
如果有什么我可以澄清的,请告诉我。