使用 Athena Query (Start Query Execution) 将 CSV 数据存储到动态路径中
Storing CSV data into Dynamic paths using Athena Query (Start Query Execution)
我正在尝试存储 JSON 已转储到输入 S3 存储桶中的数据,并使用 Athena 的开始查询执行将文件转换为另一个 S3 输出存储桶位置中的 CSV。
我正在使用一个大型查询,该查询将被插入临时 table(使用 INSERT INTO)。
即table被划分为年、月、日、时。
使用 AWS Glue 我能够为查询设置 storage.location.template table(参见屏幕截图)
s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}
我还在这个 table 上使用 AWS Gue 使用投影年、时、月和日。 (见屏幕截图)
This output patch is dynamically created based on the date and time when the event has fired. It will store CSV files from JSON that were created during that event time the Athena's query. The output path should look like the following screen scrape:
我正在使用 python lambda 提取事件记录的 eventDate 值,然后使用 Athena 查询将 csv 文件输出到动态输出路径
Note: I have only been able to run this successfully using a static S3 path but not a dynamic S3 path which is a requirement .
当我在输入 S3 存储桶中 ingest/extract 输入 JSON 文件时,当 Athena 使用动态 S3 路径运行查询时,我收到以下错误:
(<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).
请帮我确定我在做什么是不正确的。非常感谢
Here is the table property
Here is the lambda code:
def lambda_handler(event, context):
try:
print('<< IN Handler >>')
print(event)
print("File size is", event['Records'][0]['s3']['object']['size'])
if(event['Records'][0]['s3']['object']['size'] > 0):
file_path = event['Records'][0]['s3']['object']['key']
print('<<FILEPATH >>'+file_path)
bucket_name = os.environ['BUCKETBAJA']
file_name = file_path.split("/")[-1]
print(file_name)
#process the input file time stamp to extrac tthe year,month,day
parse_filename = file_name.split(today_year)
#Get date and hour
parse_date = event['Records'][0]['eventTime']
#print(parse_date)
dateStr = parse_date.split("-")
print('<< PROCESS DATE >>'+dateStr[2])
fyear= dateStr[0]
fmonth= dateStr[1]
fday = dateStr[2].split("T")
#print('REST OF STRING '+fday[1])
parse_more = fday
#print(parse_more)
hour = fday[1].split(":")
#print(hour)
fhour = hour[0]
fday = fday[0]
print('<< DAY >>'+fday)
print('<< HOUR >>'+fhour)
print('<< MONTH >>'+fmonth)
process_date = fyear+'-'+fmonth+'-'+fday
start_date = fyear+'-'+fmonth+'-'+fday
end_date = fyear+'-'+fmonth+'-'+fday
output_prefix= "all_dates/year=/"+fyear+"/month="+fmonth+'/day='+fday+'/hour='+fhour
print('<< OUT_PREFIX >>'+output_prefix)
clients3 = boto3.client('s3')
result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
exists=False
athena_out_path = 's3://'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour
print('<<< INSIDE BAJA TABLE DUMP NEXT ... >>>>')
#list_tables = table_queries_v1
if 'coldstone' in file_name:
query = kahala_coldstone
bucket_name = os.environ['BUCKETCOLDSTONE']
athena_out_path = 's3://athena-'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour
else:
query = kahala_baja
print('<<QUERY >>')
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': athena_out_path,
}
)
else:
print("Empty File, exiting")
except:
print("Unknown Error")
print(sys.exc_info())
Here is the Query code:
"""insert into olo_baja_insert_to_csv
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
cast(modifier_detail['description'] as varchar) modifier_detail_description,
cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
cast(modifier_detail['modifierquantity'] as varchar)
modifier_detail_modifier_quantity,
cast(modifier_detail['customfields'] as varchar) modifier_detail_custom_fields,
cast(modifier_quantity as varchar) modifier_quantity,
cast(modifier_custom_fields as varchar) modifier_custom_fields,
cast(delivery as varchar) delivery,
cast(total as varchar) total,
cast(subtotal as varchar) subtotal,
cast(discount as varchar) discount,
cast(tip as varchar) tip,
cast(sales_tax as varchar) sales_tax,
cast(customer_delivery as varchar) customer_delivery,
cast(payment_amount as varchar) payment_amount,
cast(payment_description as varchar) payment_description,
cast(payment_type as varchar) payment_type,
cast(location_lat as varchar) location_lat,
cast(location_long as varchar) location_long,
cast(location_name as varchar) location_name,
cast(location_logo as varchar) location_logo,
cast(ordering_provider_name as varchar) ordering_provider_name,
cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
from(
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(special_instructions as varchar) special_instructions,
cast(quantity as varchar) quantity,
cast(recipient_name as varchar) recipient_name,
cast(custom_values as varchar) custom_values,
cast(item_description as varchar) item_description,
cast(item_selling_price as varchar) item_selling_price,
cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
cast(modifier['modifierid'] as varchar) modifier_id,
cast(modifier['description'] as varchar) modifier_description,
cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
cast(modifier['modifiers'] as varchar) modifier_details,
cast(modifier['modifierquantity'] as varchar) modifier_quantity,
cast(modifier['customfields'] as varchar) modifier_custom_fields,
cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar),
cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar),
cast(location_logo as varchar),
cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
from(
select
cast(json_extract(customer, '$.firstname') as varchar) as first_name,
cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
cast(json_extract(customer, '$.lastname') as varchar) as last_name,
cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
try_cast(json_extract("item", '$.productid') as varchar) product_id,
try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
try_cast(json_extract("item", '$.quantity') as varchar) quantity,
try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
try_cast(json_extract("item", '$.description') as varchar) item_description,
try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
cast(json_extract(totals, '$.delivery') as varchar) delivery,
cast(json_extract(totals, '$.total') as varchar) total,
cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
cast(json_extract(totals, '$.discount') as varchar) discount,
cast(json_extract(totals, '$.tip') as varchar) tip,
cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
payment['amount'] payment_amount,
payment['description'] payment_description,
payment['type'] payment_type,
cast(json_extract("location", '$.latitude') as varchar) location_lat,
cast(json_extract("location", '$.longitude') as varchar) location_long,
cast(json_extract("location", '$.name') as varchar) location_name,
cast(json_extract("location", '$.logo') as varchar) location_logo,
cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
from sandbox_twilliams.olo_baja_raw_john_testing
cross join unnest (payments, "items") as t (payment, item)
--cross join unnest ("items") as t (item)
--cross join unnest (payments) as t (payment)
where year = {fyear}
and month = {fmonth}
and day = {fday}
and hour = {fhour}
).format(year={fyear},month={fmonth},day={fday}, hour={fhour})
CROSS JOIN UNNEST (modifiers) as t (modifier)
)
CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""
您的查询中存在语法错误,但您没有将查询包含在问题中,因此很难找出问题所在。看起来您在日志记录中打印了查询 SQL 所以我建议在 Athena 控制台中手动获取 SQL 和 运行 并查看您是否可以从错误消息中找出问题所在.
另一方面,转换为 CSV 最好通过 UNLOAD
完成。 Athena 的查询结果是 CSV,但 Athena 还会将二进制元数据文件与 CSV 文件一起写入,如果您希望输出目录仅包含 CSV 数据,这可能会造成混乱。
正如我对@Theo 评论的那样,试图使 insert into
语句在该查询上发挥作用绝对是一团糟。必须对查询进行一些调整,现在正在生成 csv 文件。以下调整是:
This format() functions tatement was omitted from the query just before assigning its value to the Athena client for the Start Execute Query : query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)
Another reason the query wasn't running was due to a {month} binding that
should have been {fmonth} (fmonth is a variable used to store the
month value from the event date time stamp) . The majority of the time
was spent trying to get the insert to work, but it does not seem
possible to do that insert from a JSON SerDe to a CSV SerDe. I wonder why this would be the case?
除此之外,此修复的座右铭是“省略查询字符串开头之前的 INSERT INTO
语句”。
我正在尝试存储 JSON 已转储到输入 S3 存储桶中的数据,并使用 Athena 的开始查询执行将文件转换为另一个 S3 输出存储桶位置中的 CSV。
我正在使用一个大型查询,该查询将被插入临时 table(使用 INSERT INTO)。
即table被划分为年、月、日、时。
使用 AWS Glue 我能够为查询设置 storage.location.template table(参见屏幕截图)
s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}
我还在这个 table 上使用 AWS Gue 使用投影年、时、月和日。 (见屏幕截图)
This output patch is dynamically created based on the date and time when the event has fired. It will store CSV files from JSON that were created during that event time the Athena's query. The output path should look like the following screen scrape:
我正在使用 python lambda 提取事件记录的 eventDate 值,然后使用 Athena 查询将 csv 文件输出到动态输出路径
Note: I have only been able to run this successfully using a static S3 path but not a dynamic S3 path which is a requirement .
当我在输入 S3 存储桶中 ingest/extract 输入 JSON 文件时,当 Athena 使用动态 S3 路径运行查询时,我收到以下错误:
(<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).
请帮我确定我在做什么是不正确的。非常感谢
Here is the table property
Here is the lambda code:
def lambda_handler(event, context):
try:
print('<< IN Handler >>')
print(event)
print("File size is", event['Records'][0]['s3']['object']['size'])
if(event['Records'][0]['s3']['object']['size'] > 0):
file_path = event['Records'][0]['s3']['object']['key']
print('<<FILEPATH >>'+file_path)
bucket_name = os.environ['BUCKETBAJA']
file_name = file_path.split("/")[-1]
print(file_name)
#process the input file time stamp to extrac tthe year,month,day
parse_filename = file_name.split(today_year)
#Get date and hour
parse_date = event['Records'][0]['eventTime']
#print(parse_date)
dateStr = parse_date.split("-")
print('<< PROCESS DATE >>'+dateStr[2])
fyear= dateStr[0]
fmonth= dateStr[1]
fday = dateStr[2].split("T")
#print('REST OF STRING '+fday[1])
parse_more = fday
#print(parse_more)
hour = fday[1].split(":")
#print(hour)
fhour = hour[0]
fday = fday[0]
print('<< DAY >>'+fday)
print('<< HOUR >>'+fhour)
print('<< MONTH >>'+fmonth)
process_date = fyear+'-'+fmonth+'-'+fday
start_date = fyear+'-'+fmonth+'-'+fday
end_date = fyear+'-'+fmonth+'-'+fday
output_prefix= "all_dates/year=/"+fyear+"/month="+fmonth+'/day='+fday+'/hour='+fhour
print('<< OUT_PREFIX >>'+output_prefix)
clients3 = boto3.client('s3')
result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
exists=False
athena_out_path = 's3://'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour
print('<<< INSIDE BAJA TABLE DUMP NEXT ... >>>>')
#list_tables = table_queries_v1
if 'coldstone' in file_name:
query = kahala_coldstone
bucket_name = os.environ['BUCKETCOLDSTONE']
athena_out_path = 's3://athena-'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour
else:
query = kahala_baja
print('<<QUERY >>')
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': athena_out_path,
}
)
else:
print("Empty File, exiting")
except:
print("Unknown Error")
print(sys.exc_info())
Here is the Query code:
"""insert into olo_baja_insert_to_csv
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
cast(modifier_detail['description'] as varchar) modifier_detail_description,
cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
cast(modifier_detail['modifierquantity'] as varchar)
modifier_detail_modifier_quantity,
cast(modifier_detail['customfields'] as varchar) modifier_detail_custom_fields,
cast(modifier_quantity as varchar) modifier_quantity,
cast(modifier_custom_fields as varchar) modifier_custom_fields,
cast(delivery as varchar) delivery,
cast(total as varchar) total,
cast(subtotal as varchar) subtotal,
cast(discount as varchar) discount,
cast(tip as varchar) tip,
cast(sales_tax as varchar) sales_tax,
cast(customer_delivery as varchar) customer_delivery,
cast(payment_amount as varchar) payment_amount,
cast(payment_description as varchar) payment_description,
cast(payment_type as varchar) payment_type,
cast(location_lat as varchar) location_lat,
cast(location_long as varchar) location_long,
cast(location_name as varchar) location_name,
cast(location_logo as varchar) location_logo,
cast(ordering_provider_name as varchar) ordering_provider_name,
cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
from(
select cast(first_name as varchar) first_name,
cast(contact_number as varchar) contact_number,
cast(membership_number as varchar) membership_number,
cast(olo_customer_id as varchar) olo_customer_id,
cast(login_providers as varchar) login_providers,
cast(external_reference as varchar) external_reference,
cast(olo_email_address as varchar) olo_email_address,
cast(last_name as varchar) last_name,
cast(loyalty_scheme as varchar) loyalty_scheme,
cast(product_id as varchar) product_id,
cast(special_instructions as varchar) special_instructions,
cast(quantity as varchar) quantity,
cast(recipient_name as varchar) recipient_name,
cast(custom_values as varchar) custom_values,
cast(item_description as varchar) item_description,
cast(item_selling_price as varchar) item_selling_price,
cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
cast(modifier['modifierid'] as varchar) modifier_id,
cast(modifier['description'] as varchar) modifier_description,
cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
cast(modifier['modifiers'] as varchar) modifier_details,
cast(modifier['modifierquantity'] as varchar) modifier_quantity,
cast(modifier['customfields'] as varchar) modifier_custom_fields,
cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar),
cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar),
cast(location_logo as varchar),
cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
from(
select
cast(json_extract(customer, '$.firstname') as varchar) as first_name,
cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
cast(json_extract(customer, '$.lastname') as varchar) as last_name,
cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
try_cast(json_extract("item", '$.productid') as varchar) product_id,
try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
try_cast(json_extract("item", '$.quantity') as varchar) quantity,
try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
try_cast(json_extract("item", '$.description') as varchar) item_description,
try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
cast(json_extract(totals, '$.delivery') as varchar) delivery,
cast(json_extract(totals, '$.total') as varchar) total,
cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
cast(json_extract(totals, '$.discount') as varchar) discount,
cast(json_extract(totals, '$.tip') as varchar) tip,
cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
payment['amount'] payment_amount,
payment['description'] payment_description,
payment['type'] payment_type,
cast(json_extract("location", '$.latitude') as varchar) location_lat,
cast(json_extract("location", '$.longitude') as varchar) location_long,
cast(json_extract("location", '$.name') as varchar) location_name,
cast(json_extract("location", '$.logo') as varchar) location_logo,
cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
from sandbox_twilliams.olo_baja_raw_john_testing
cross join unnest (payments, "items") as t (payment, item)
--cross join unnest ("items") as t (item)
--cross join unnest (payments) as t (payment)
where year = {fyear}
and month = {fmonth}
and day = {fday}
and hour = {fhour}
).format(year={fyear},month={fmonth},day={fday}, hour={fhour})
CROSS JOIN UNNEST (modifiers) as t (modifier)
)
CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""
您的查询中存在语法错误,但您没有将查询包含在问题中,因此很难找出问题所在。看起来您在日志记录中打印了查询 SQL 所以我建议在 Athena 控制台中手动获取 SQL 和 运行 并查看您是否可以从错误消息中找出问题所在.
另一方面,转换为 CSV 最好通过 UNLOAD
完成。 Athena 的查询结果是 CSV,但 Athena 还会将二进制元数据文件与 CSV 文件一起写入,如果您希望输出目录仅包含 CSV 数据,这可能会造成混乱。
正如我对@Theo 评论的那样,试图使 insert into
语句在该查询上发挥作用绝对是一团糟。必须对查询进行一些调整,现在正在生成 csv 文件。以下调整是:
This format() functions tatement was omitted from the query just before assigning its value to the Athena client for the Start Execute Query :
query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)
Another reason the query wasn't running was due to a {month} binding that should have been {fmonth} (fmonth is a variable used to store the month value from the event date time stamp) . The majority of the time was spent trying to get the insert to work, but it does not seem possible to do that insert from a JSON SerDe to a CSV SerDe. I wonder why this would be the case?
除此之外,此修复的座右铭是“省略查询字符串开头之前的 INSERT INTO
语句”。