如何防止使用 Spark DataFrames 处理文件两次

How to prevent processing files twice with Spark DataFrames

我正在使用 AWS Glue 将一些 S3 TSV 处理成 S3 Parquet。由于非 UTF-8 传入文件,我被迫使用 DataFrames 而不是 DynamicFrames 来处理我的数据(这是一个已知的问题,没有任何解决方法,即 DynamicFrames 在处理任何非 UTF8 字符时会完全失败)。这似乎也意味着我无法在 Glue 中使用作业书签来跟踪我已经处理了哪些 S3 TSV 文件。

我的代码如下所示:

# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame

# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define massive list of fields in the schema
fields = [
    StructField("accept_language", StringType(), True),
    StructField("browser", LongType(), True),
    .... huge list ...
    StructField("yearly_visitor", ShortType(), True),
    StructField("zip", StringType(), True)
]

schema = StructType(fields)

# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")

# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')

# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))

# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])

job.commit()

我的问题是 - 每次运行时都没有作业书签,它会一遍又一遍地处理相同的 s3 文件。如何将源 s3 存储桶中已处理的文件移动到子文件夹或其他内容,或者避免重复处理文件?

我不确定这里有什么技巧,Spark 是一个并行系统,甚至不知道文件是什么。我想我可以用 Python Shell 作业类型创建第二个 Glue 作业,然后立即删除传入的文件,但即便如此我也不确定要删除哪些文件等

谢谢,

克里斯

如果您不关心再次处理相同的源文件(相对于时间限制)并且您的用例是目标中没有重复数据,您可以考虑将保存模式更新为 "Overwrite" 写入数据框时

https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/DataFrameWriter.html

要将已处理的文件标记为输入源前缀之外,您必须使用 boto3(或直接使用 awscli)移动或删除文件。

要确定要处理的文件,您可以通过两种不同的方式进行:

  • 在使用 spark 之前,使用带有 s3client.list_objects() 的 boto3 解析文件 glob args['s3source'] + "/*.tsv.gz"。 您可以向 spark.read.load.
  • 提供一组解析文件而不是 glob
import boto3
client = boto3.client('s3')

# get all the available files
# Note: if you expect a lot of files, you need to iterate on the pages of results

response = client.list_objects_v2(Bucket=your_bucket_name,Prefix=your_path_prefix)
files=['s3://'+your_bucket_name+obj['Key'] for obj in response['Contents'] if obj.endswith('tsv.gz')]

 ... initialize your job as before ...

df0 = df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)

 ... do your work as before ...
  • 使用 spark 跟踪所有输入文件的事实,以便 post- 在成功保存后处理它们:
 ... process your files with pyspark as before...

# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'

files = [] 
for p in df0.rdd._jrdd.partitions(): 
    files.append([f.filePath() for f in p.files().array()])

获得文件列表后,删除、重命名或将它们添加到元数据存储以在下一个作业中过滤掉它们就非常简单了。

例如,要删除它们:

# initialize a S3 client if not already done
from urlparse import urlparse # python 2
import boto3
client = boto3.client('s3')

# do what you want with the uris, for example delete them
for uri in files:
   parsed = urlparse(uri)
   client.delete_object(Bucket=parsed.netloc, Key=parsed.path)

我用于通过 AWS glue 开发的 ETL 过程之一的解决方案是首先使用 boto3 API 列出 s3 中的文件并将其移动到 "WORK" 文件夹。此过程不会花费任何时间,因为您只是更改 s3 对象名称而不是任何物理移动。

完成上述步骤后,您可以使用 "WORK" 文件夹作为 SPARK dataFrame 的输入,同时可以将新文件不断推送到其他 s3 文件夹。

我不确定您的用例,但我们使用当前系统日期时间来创建 "WORK" 文件夹,以便我们可以在发现我们处理的流程或数据有任何问题时调查或重新运行任何文件几天后加载。

最终工作代码:

# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
import boto3
from urlparse import urlparse

# Read arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])

# Initialise boto3
client = boto3.client('s3')

# Get all the available files
response = client.list_objects_v2(Bucket = "xxx")
files = [ "s3://xxx/" + obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.tsv.gz') ]

# Initialise the glue job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define massive list of fields in the schema
fields = [
    StructField("accept_language", StringType(), True),
    StructField("browser", LongType(), True),
    .... huge list ...
    StructField("yearly_visitor", ShortType(), True),
    StructField("zip", StringType(), True)
]

schema = StructType(fields)

# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)

# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')

# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))

# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])

# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
    files.extend([f.filePath() for f in p.files().array()])

# Move files to the processed folder
for uri in files:
   parsed = urlparse(uri)
   client.copy_object(CopySource = {'Bucket': parsed.netloc, 'Key': parsed.path.lstrip('/')}, Bucket = parsed.netloc, Key = 'processed' + parsed.path)
   client.delete_object(Bucket = parsed.netloc, Key = parsed.path.lstrip('/'))

job.commit()