无法使用 Golang SDK 在 AWS athena 上执行查询

Unable to perform query on AWS athena using Golang SDK

我是 AWS 和 Golang 的新手,我正在尝试创建一个 lambda 函数,它将触发 AWS Athena 查询并使用 AWS SES 服务通过电子邮件发送结果。即使搜索了一个小时,我也找不到 lambda 函数(在 Golang 中)的工作示例来对 Athena 执行查询并获取查询的输出。

在搜索时,我在 Java、Python 和 Node Js 中找到了相同的代码,但在 Golang 中找不到。

甚至 Go-SDK page 重定向到 Java 示例。但不幸的是,我什至不明白Java。

我也查看了这个 AWS SDK for Go API Reference 页面。但是我不明白程序的流程是什么,操作到select.

我已经尝试为此创建程序,这可能是完全错误的,我不知道下一步该怎么做。下面是代码-

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

func main() {

    // Create a new session in the us-west-2 region.
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String("us-east-1")},
    )

    // Create an Athena session.
    client := athena.New(sess)

    // Example sending a request using the StartQueryExecutionRequest method.
    query := "SELECT * FROM table1 ;"
    params := query
    req, resp := client.StartQueryExecutionRequest(params)

    err1 := req.Send()
    if err1 == nil { // resp is now filled
        fmt.Println(resp)
    }
}

如果有人可以帮助我执行 Athena 查询并在 Golang 中获得结果(最好)或者可以共享一些资源,我将不胜感激。收到后,我可以使用 AWS SES 发送电子邮件。

从这里开始。

// run as: go run main.go
package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/aws/endpoints"
    "github.com/aws/aws-sdk-go-v2/aws/external"
    "github.com/aws/aws-sdk-go-v2/service/athena"
)

const table = "textqldb.textqltable"
const outputBucket = "s3://bucket-name-here/"

func main() {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        fmt.Printf("config error: %v\n", err)
        return
    }

    cfg.Region = endpoints.UsEast2RegionID

    client := athena.New(cfg)

    query := "select * from " + table

    resultConf := &athena.ResultConfiguration{
        OutputLocation: aws.String(outputBucket),
    }

    params := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(query),
        ResultConfiguration: resultConf,
    }

    req := client.StartQueryExecutionRequest(params)

    resp, err := req.Send(context.TODO())
    if err != nil {
        fmt.Printf("query error: %v\n", err)
        return
    }

    fmt.Println(resp)
}

@Everton 的代码正在 Athena 上执行查询,它的响应被保存在 S3 存储桶中并且没有被返回。因此,我添加了执行 Athena 查询并获取响应的代码。希望这可以帮助其他人。

// run as: go run main.go
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/aws/endpoints"
    "github.com/aws/aws-sdk-go-v2/aws/external"
    "github.com/aws/aws-sdk-go-v2/service/athena"
)

const table = "<Database_Name>.<Table_Name>"
const outputBucket = "s3://bucket-name-here/"

// Execute the query and return the query ID
func executeQuery(query string) *string {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        fmt.Printf("config error: %v\n", err)
    }

    cfg.Region = endpoints.UsEast2RegionID

    client := athena.New(cfg)

    resultConf := &athena.ResultConfiguration{
        OutputLocation: aws.String(outputBucket),
    }

    params := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(query),
        ResultConfiguration: resultConf,
    }

    req := client.StartQueryExecutionRequest(params)

    resp, err := req.Send(context.TODO())
    fmt.Println("Response is: ", resp, " Error is:", err)

    if err != nil {
        fmt.Printf("Query Error: %v\n", err)
    }

    fmt.Println("Query Execution Response ID:", resp.QueryExecutionId)
    return resp.QueryExecutionId
}

// Takes queryId as input and returns its response
func getQueryResults(QueryID *string) (*athena.GetQueryResultsResponse, error) {

    cfg, err := external.LoadDefaultAWSConfig()
    if err != nil {
        panic("config error")
    }

    cfg.Region = endpoints.UsEast2RegionID
    client := athena.New(cfg)
    params1 := &athena.GetQueryResultsInput{
        QueryExecutionId: QueryID,
    }
    req := client.GetQueryResultsRequest(params1)

    resp, err := req.Send(context.TODO())

    if err != nil {
        fmt.Printf("Query Response Error: %v\n", err)
        return nil, err
    }
    return resp, nil

}

func main() {

    query := "select * from " + table

    // Execute an Athena Query
    QueryID := executeQuery(query)


    // Get the response of the query

    // Wait for some time for query completion
    time.Sleep(15 * time.Second) // Otherwise create a loop and try for every x seconds
    Resp, err := getQueryResults(QueryID)

    if err != nil {
        fmt.Printf("Error getting Query Response: %v\n", err)
    } else {
        fmt.Println(" \nRows:", Resp.ResultSet.Rows)
    }

}