循环遍历 table 的列表并保存到名称为 table 的 csv

loop through a list of tables and save to csv with table name

我有一个 120 table 的列表,我想将每个 table 的前 1000 行和后 1000 行的样本大小保存到每个 table 的单独 csv 文件中。

如何在代码回购或代码创作中完成此操作。

以下代码允许将一个 table 保存到 csv,任何人都可以帮助循环遍历项目文件夹中的 table 列表并为每个 [=27] 创建单独的 csv 文件=]?

@transform(
    my_input = Input('/path/to/input/dataset'),
    my_output = Output('/path/to/output/dataset')
)
def compute_function(my_input, my_output):
    my_output.write_dataframe(
        my_input.dataframe(),
        output_format = "csv",
        options = {
            "compression": "gzip"
        }
    )

伪代码

list_of_tables = [table1,table2,table3,...table120]
for tables in list_of_tables:
    table = table.limit(1000)
    table.write_dataframe(table.dataframe(),output_format = "csv",
        options = {
            "compression": "gzip"
        })

我能够让它为一个 table 工作,我怎样才能循环遍历 table 的列表并生成它? 一个 table

的代码
# to get the first and last rows 
from transforms.api import transform_df, Input, Output 
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col


table_name = 'stock'
@transform_df(
    output=Output(f"foundry/sample/{table_name}_sample"),
    my_input=Input(f"foundry/input/{table_name}"),
)
def compute_first_last_1000(my_input):
    first_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    first_stock_df = first_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    last_stock_df = my_input.withColumn("index", monotonically_increasing_id())
    last_stock_df = last_stock_df.orderBy("index").filter(col("index") < 1000).drop("index")
    stock_df = first_stock_df.unionByName(last_stock_df)
    return stock_df

# code to save as csv file
table_name = 'stock'

@transform(
        output=Output(f"foundry/sample/{table_name}_sample_csv"),
        my_input=Input(f"foundry/sample/{table_name}_sample"),
)

def my_compute_function(my_input, output):
    df = my_input.dataframe()
    with output.filesystem().open('stock.csv', 'w') as stream:
        csv_writer = csv.writer(stream)
        csv_writer.writerow(df.schema.names)
        csv_writer.writerows(df.collect())

如果您需要读取 table 名称而不是对它们进行硬编码,那么您可以使用 os.listdiros.walk 方法。

我认为之前的答案遗漏了关于仅导出前 N 行和最后 N 行的部分。如果 table 转换为数据帧 df,则

dfoutput = df.head(N).append(df.tail(N)])

dfoutput = df[:N].append(df[-N:])

您最好的策略是以编程方式生成您的转换,如果您不想创建 1000 个转换,您也可以进行多输出转换。像这样的东西(现场写在答案框中,未经测试的代码某些语法可能是错误的):

# you can generate this programatically
my_inputs = [
   '/path/to/input/dataset1',
   '/path/to/input/dataset2',
   '/path/to/input/dataset3',
   # ...
]

for table_path in my_inputs:
   @transform_df(
      Output(table_path + '_out'),
      df=Input(table_path))
   def transform(df):
       # your logic here
       return df