使用 Athena Query (Start Query Execution) 将 CSV 数据存储到动态路径中

Storing CSV data into Dynamic paths using Athena Query (Start Query Execution)

我正在尝试存储 JSON 已转储到输入 S3 存储桶中的数据,并使用 Athena 的开始查询执行将文件转换为另一个 S3 输出存储桶位置中的 CSV。

  1. 我正在使用一个大型查询,该查询将被插入临时 table(使用 INSERT INTO)。

  2. 即table被划分为年、月、日、时。

  3. 使用 AWS Glue 我能够为查询设置 storage.location.template table(参见屏幕截图)

    s3://prod-cog-kahala-test-output/data/landing/olo/baja/year=${year}/month=${month}/day=${day}/hour=${hour}

  4. 我还在这个 table 上使用 AWS Gue 使用投影年、时、月和日。 (见屏幕截图)

This output patch is dynamically created based on the date and time when the event has fired. It will store CSV files from JSON that were created during that event time the Athena's query. The output path should look like the following screen scrape:

我正在使用 python lambda 提取事件记录的 eventDate 值,然后使用 Athena 查询将 csv 文件输出到动态输出路径

Note: I have only been able to run this successfully using a static S3 path but not a dynamic S3 path which is a requirement .

当我在输入 S3 存储桶中 ingest/extract 输入 JSON 文件时,当 Athena 使用动态 S3 路径运行查询时,我收到以下错误:

(<class 'botocore.errorfactory.InvalidRequestException'>, InvalidRequestException("An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: line 45:64: mismatched input '{'. Expecting: '*', <expression>, <identifier>"), <traceback object at 0x7fac83b93300>).

请帮我确定我在做什么是不正确的。非常感谢

Here is the table property

Here is the lambda code:

def lambda_handler(event, context):
try:
    print('<< IN Handler >>')
    print(event)
    print("File size is", event['Records'][0]['s3']['object']['size'])
    if(event['Records'][0]['s3']['object']['size'] > 0):
        file_path = event['Records'][0]['s3']['object']['key']
        print('<<FILEPATH >>'+file_path)
        bucket_name = os.environ['BUCKETBAJA']
        file_name = file_path.split("/")[-1]
        print(file_name)
        #process the  input file time stamp to extrac tthe year,month,day
        parse_filename = file_name.split(today_year)
        #Get date and hour
        parse_date = event['Records'][0]['eventTime']
        #print(parse_date)
        dateStr = parse_date.split("-")
        print('<< PROCESS DATE >>'+dateStr[2])
        fyear= dateStr[0]
        fmonth= dateStr[1]
        fday =  dateStr[2].split("T")
        #print('REST OF STRING '+fday[1])
        parse_more = fday
        #print(parse_more)
        hour = fday[1].split(":")
        #print(hour)
        fhour = hour[0]
        fday = fday[0]
        print('<< DAY >>'+fday)
        print('<< HOUR >>'+fhour)
        print('<< MONTH >>'+fmonth)
        process_date = fyear+'-'+fmonth+'-'+fday
        start_date = fyear+'-'+fmonth+'-'+fday
        end_date = fyear+'-'+fmonth+'-'+fday
        output_prefix= "all_dates/year=/"+fyear+"/month="+fmonth+'/day='+fday+'/hour='+fhour
        print('<< OUT_PREFIX  >>'+output_prefix)
        clients3 = boto3.client('s3')
        result = clients3.list_objects(Bucket=bucket_name, Prefix=output_prefix )
        exists=False
        athena_out_path = 's3://'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour 
        print('<<< INSIDE BAJA TABLE DUMP NEXT ...  >>>>')
        #list_tables = table_queries_v1
        if 'coldstone' in file_name:
            query = kahala_coldstone
            bucket_name = os.environ['BUCKETCOLDSTONE']
            athena_out_path = 's3://athena-'+bucket_name+'/all-dates/year='+fyear+'/month='+fmonth+'/day='+fday+'/hour='+fhour 
        else:
            query = kahala_baja
            print('<<QUERY  >>')
            response = athena_client.start_query_execution( 
                QueryString=query,
                QueryExecutionContext={
                  'Database': database
                },
                ResultConfiguration={
                    'OutputLocation': athena_out_path,
                }
            )
       
    else:
        print("Empty File, exiting")
except:
    print("Unknown Error")
    print(sys.exc_info())

Here is the Query code:

    """insert into olo_baja_insert_to_csv
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number,
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(modifier_detail['modifierid'] as varchar) modifier_detail_modifier_id,
    cast(modifier_detail['description'] as varchar) modifier_detail_description,
    cast(modifier_detail['vendorspecificmodifierid'] as varchar) modifier_detail_vendor_specific_modifier_id,
    cast(modifier_detail['modifiers'] as varchar) modifier_detail_modifiers,
    cast(modifier_detail['modifierquantity'] as varchar) 
modifier_detail_modifier_quantity,
    cast(modifier_detail['customfields'] as varchar)  modifier_detail_custom_fields,
    cast(modifier_quantity as varchar) modifier_quantity,
   cast(modifier_custom_fields as varchar) modifier_custom_fields,
    cast(delivery as varchar) delivery,
    cast(total as varchar) total,
    cast(subtotal as varchar) subtotal,
    cast(discount as varchar) discount,
    cast(tip as varchar) tip,
    cast(sales_tax as varchar) sales_tax,
    cast(customer_delivery as varchar) customer_delivery, 
    cast(payment_amount as varchar) payment_amount, 
    cast(payment_description as varchar) payment_description,
    cast(payment_type as varchar) payment_type,
    cast(location_lat as varchar) location_lat,
    cast(location_long as varchar) location_long,
    cast(location_name as varchar) location_name,
    cast(location_logo as varchar) location_logo,
    cast(ordering_provider_name as varchar) ordering_provider_name,
    cast(ordering_provider_slug as varchar) ordering_provider_slug,year,month,day, hour
    from( 
    select cast(first_name as varchar) first_name,
    cast(contact_number as varchar) contact_number, 
    cast(membership_number as varchar) membership_number,
    cast(olo_customer_id as varchar) olo_customer_id,
    cast(login_providers as varchar) login_providers,
    cast(external_reference as varchar) external_reference,
    cast(olo_email_address as varchar) olo_email_address,
    cast(last_name as varchar) last_name,
    cast(loyalty_scheme as varchar) loyalty_scheme,
    cast(product_id as varchar) product_id,
    cast(special_instructions as varchar) special_instructions,
    cast(quantity as varchar) quantity,
    cast(recipient_name as varchar) recipient_name,
    cast(custom_values as varchar) custom_values,
    cast(item_description as varchar) item_description,
    cast(item_selling_price as varchar) item_selling_price,
    cast(modifier['sellingprice'] as varchar) pre_modifier_selling_price,
    cast(modifier['modifierid'] as varchar) modifier_id,
    cast(modifier['description'] as varchar) modifier_description,
    cast(modifier['vendorspecificmodifierid'] as varchar) vendor_specific_modifierid,
    cast(modifier['modifiers'] as varchar) modifier_details,
    cast(modifier['modifierquantity'] as varchar) modifier_quantity,
    cast(modifier['customfields'] as varchar) modifier_custom_fields,
    cast(delivery as varchar),cast(total as varchar),cast(subtotal as varchar), 
    cast(discount as varchar),cast(tip as varchar),cast(sales_tax as varvchar),cast(customer_delivery as varchar), cast(payment_amount as varchar), cast(payment_description as varchar),
    cast(payment_type as varchar),cast(location_lat as varchar),cast(location_long as varchar), cast(location_name as varchar), 
    cast(location_logo as varchar),
    cast(ordering_provider_name as varchar), cast(ordering_provider_slug as varchar), year,month,day,hour
    from(
    select
    cast(json_extract(customer, '$.firstname') as varchar) as first_name,
    cast(json_extract(customer, '$.contactnumber') as varchar) as contact_number,
    cast(json_extract(customer, '$.membershipnumber') as varchar) as membership_number,
    cast(json_extract(customer, '$.customerid') as varchar) as olo_customer_id,
    cast(json_extract(customer, '$.loginproviders') as array<map<varchar,varchar>>) as login_providers,
    cast(json_extract(customer, '$.externalreference') as varchar) as external_reference,
    cast(json_extract(customer, '$.email') as varchar) as olo_email_address,
    cast(json_extract(customer, '$.lastname') as varchar) as last_name,
    cast(json_extract(customer, '$.loyaltyscheme') as varchar) as loyalty_scheme,
    try_cast(json_extract("item", '$.productid') as varchar) product_id,
    try_cast(json_extract("item", '$.specialinstructions') as varchar) special_instructions,
    try_cast(json_extract("item", '$.quantity') as varchar) quantity,
    try_cast(json_extract("item", '$.recipientname') as varchar) recipient_name,
    try_cast(json_extract("item", '$.customvalues') as varchar) custom_values,
    try_cast(json_extract("item", '$.description') as varchar) item_description,
    try_cast(json_extract("item", '$.sellingprice') as varchar) item_selling_price,
    cast(json_extract("item", '$.modifiers') as array<map<varchar,json>>) modifiers,
    cast(json_extract(totals, '$.delivery') as varchar) delivery,
    cast(json_extract(totals, '$.total') as varchar) total,
    cast(json_extract(totals, '$.subtotal') as varchar) subtotal,
    cast(json_extract(totals, '$.discount') as varchar) discount,
    cast(json_extract(totals, '$.tip') as varchar) tip,
    cast(json_extract(totals, '$.salestax') as varchar) sales_tax,
    cast(json_extract(totals, '$.customerdelivery') as varchar) customer_delivery,
    payment['amount'] payment_amount,
    payment['description'] payment_description,
    payment['type'] payment_type,
    cast(json_extract("location", '$.latitude') as varchar) location_lat,
    cast(json_extract("location", '$.longitude') as varchar) location_long,
    cast(json_extract("location", '$.name') as varchar) location_name,
    cast(json_extract("location", '$.logo') as varchar) location_logo,
    cast(json_extract("orderingprovider", '$.name') as varchar) ordering_provider_name,
    cast(json_extract("orderingprovider", '$.slug') as varchar) ordering_provider_slug,hour
    from sandbox_twilliams.olo_baja_raw_john_testing
    cross join unnest (payments, "items") as t (payment, item)
    --cross join unnest ("items") as t (item)
    --cross join unnest (payments) as t (payment)
    where year = {fyear}
    and month = {fmonth}
    and day = {fday}
    and hour = {fhour}
    ).format(year={fyear},month={fmonth},day={fday}, hour={fhour})  
    CROSS JOIN UNNEST (modifiers) as t (modifier)
    )
    CROSS JOIN UNNEST (cast(modifier_details as array<map<varchar,json>>)) as t (modifier_detail)
    group by 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45;"""      

您的查询中存在语法错误,但您没有将查询包含在问题中,因此很难找出问题所在。看起来您在日志记录中打印了查询 SQL 所以我建议在 Athena 控制台中手动获取 SQL 和 运行 并查看您是否可以从错误消息中找出问题所在.

另一方面,转换为 CSV 最好通过 UNLOAD 完成。 Athena 的查询结果是 CSV,但 Athena 还会将二进制元数据文件与 CSV 文件一起写入,如果您希望输出目录仅包含 CSV 数据,这可能会造成混乱。

正如我对@Theo 评论的那样,试图使 insert into 语句在该查询上发挥作用绝对是一团糟。必须对查询进行一些调整,现在正在生成 csv 文件。以下调整是:

This format() functions tatement was omitted from the query just before assigning its value to the Athena client for the Start Execute Query : query = kahala_coldstone.format(fyear=fyear,fmonth=fmonth,fday=fday,fhour=fhour)

Another reason the query wasn't running was due to a {month} binding that should have been {fmonth} (fmonth is a variable used to store the month value from the event date time stamp) . The majority of the time was spent trying to get the insert to work, but it does not seem possible to do that insert from a JSON SerDe to a CSV SerDe. I wonder why this would be the case?

除此之外,此修复的座右铭是“省略查询字符串开头之前的 INSERT INTO 语句”。