AWS Glue 脚本,未找到定界符错误
AWS Glue script, error for Delimiter Not Found
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import os
from pyspark.sql.functions import current_timestamp
import sys
def move_file(bucket, key, filename):
s3_resource = boto3.resource('s3')
dest_key = 'dcgs_abv/upload_archive_files'
# Copy object A as object B
s3_resource.Object(bucket, dest_key + filename).copy_from(CopySource=key + filename, ACL='public-read')
s3_resource.Object(bucket,filename).delete()
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
schema= "dcgs"
table_bcs_output = "bcs_output_test"
redshift_schema_table_bcs_output = schema + "." + table_bcs_output
bucket = 'bucket'
prefix = 'key/upload_files/'
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket = bucket, Prefix=prefix)
for object in response['Contents']:
key = object['Key']
if '.txt' in key :
file = os.path.basename(object['Key'])
abv_data = glueContext.create_dynamic_frame_from_options("s3", \
{'paths': ["s3://{}/{}".format(bucket, prefix)], \
"recurse":True, 'groupFiles': 'inPartition'}, format="csv", delimiter='\t')
abv_df = abv_data.toDF().withColumn("snapshot_day", current_timestamp())
conparms_r = glueContext.extract_jdbc_conf("reporting", catalog_id = None)
abv_df.write\
.format("com.databricks.spark.redshift")\
.option("url", "jdbc:redshift://rs-reporting.cy2mjiqdtk9k.us-east-1.redshift.amazonaws.com:8192/rptg")\
.option("dbtable", redshift_schema_table_bcs_output)\
.option("user", conparms_r['user'])\
.option("password", conparms_r['password'])\
.option("aws_iam_role", "arn:aws:iam::123456789:role/redshift_admin_role")\
.option("tempdir", args["TempDir"])\
.option("delimiter", '\t')\
.option("quote","'")\
.mode("append")\
.save()
move_file(bucket, key, file)
job.commit()
我在使用 Spark 2.4 Python3(Glue 版本 1.0)的 AWS Glue 中找到一份工作。我已经尝试了所有可以找到的方法来为 Redshift 负载设置定界符。我已经尝试了 .option("sep",'\t')
、.option("separator",'\t')
和其他一些方法。
相关文件是制表符分隔的文本文件。我已经使用制表符分隔符和 ' 作为文本打开了 open office。
谁能告诉我哪里错了?
答案,我不得不深入挖掘才能找到,因为它在此处的文档中不是特别明显:https://docs.databricks.com/data/data-sources/aws/amazon-redshift.html 如下:
.option("extracopyoptions","delimiter '\t'")
这将在正在执行的 COPY 命令期间正确设置定界符。
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import os
from pyspark.sql.functions import current_timestamp
import sys
def move_file(bucket, key, filename):
s3_resource = boto3.resource('s3')
dest_key = 'dcgs_abv/upload_archive_files'
# Copy object A as object B
s3_resource.Object(bucket, dest_key + filename).copy_from(CopySource=key + filename, ACL='public-read')
s3_resource.Object(bucket,filename).delete()
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
schema= "dcgs"
table_bcs_output = "bcs_output_test"
redshift_schema_table_bcs_output = schema + "." + table_bcs_output
bucket = 'bucket'
prefix = 'key/upload_files/'
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket = bucket, Prefix=prefix)
for object in response['Contents']:
key = object['Key']
if '.txt' in key :
file = os.path.basename(object['Key'])
abv_data = glueContext.create_dynamic_frame_from_options("s3", \
{'paths': ["s3://{}/{}".format(bucket, prefix)], \
"recurse":True, 'groupFiles': 'inPartition'}, format="csv", delimiter='\t')
abv_df = abv_data.toDF().withColumn("snapshot_day", current_timestamp())
conparms_r = glueContext.extract_jdbc_conf("reporting", catalog_id = None)
abv_df.write\
.format("com.databricks.spark.redshift")\
.option("url", "jdbc:redshift://rs-reporting.cy2mjiqdtk9k.us-east-1.redshift.amazonaws.com:8192/rptg")\
.option("dbtable", redshift_schema_table_bcs_output)\
.option("user", conparms_r['user'])\
.option("password", conparms_r['password'])\
.option("aws_iam_role", "arn:aws:iam::123456789:role/redshift_admin_role")\
.option("tempdir", args["TempDir"])\
.option("delimiter", '\t')\
.option("quote","'")\
.mode("append")\
.save()
move_file(bucket, key, file)
job.commit()
我在使用 Spark 2.4 Python3(Glue 版本 1.0)的 AWS Glue 中找到一份工作。我已经尝试了所有可以找到的方法来为 Redshift 负载设置定界符。我已经尝试了 .option("sep",'\t')
、.option("separator",'\t')
和其他一些方法。
相关文件是制表符分隔的文本文件。我已经使用制表符分隔符和 ' 作为文本打开了 open office。
谁能告诉我哪里错了?
答案,我不得不深入挖掘才能找到,因为它在此处的文档中不是特别明显:https://docs.databricks.com/data/data-sources/aws/amazon-redshift.html 如下:
.option("extracopyoptions","delimiter '\t'")
这将在正在执行的 COPY 命令期间正确设置定界符。