火花雅典娜连接器

spark Athena connector

我需要在 spark 中使用 Athena,但 spark 在使用 JDBC 驱动程序时使用 preparedStatement,它给了我一个例外 "com.amazonaws.athena.jdbc.NotImplementedException: Method Connection.prepareStatement is not yet implemented"

你能告诉我如何在 spark 中连接 Athena

您不能直接将 Spark 连接到 Athena。 Athena 只是针对 s3 的 Prestodb 的一个实现。与 Presto 不同,Athena 无法将数据定位到 HDFS。

不过,如果你想用Spark查询s3中的数据,那你就走运了HUE, which will let you query data in s3 from Spark on Elastic Map Reduce (EMR)

另请参阅: Developer Guide for Hadoop User Experience (HUE) on EMR.

我不知道您如何从 Spark 连接到 Athena,但您不需要 - 您可以非常轻松地查询 Athena 包含的数据(或者更准确地说,"registers")来自 Spark.

雅典娜有两个部分

  1. Hive Metastore(现在称为 Glue 数据目录)包含数据库和 table 名称以及所有基础文件之间的映射
  2. Presto 查询引擎,可将您的 SQL 转换为针对这些文件的数据操作

当您启动 EMR 集群(v5.8.0 及更高版本)时,您可以指示它连接到您的 Glue 数据目录。这是 'create cluster' 对话框中的一个复选框。当您选中此选项时,您的 Spark SqlContext 将连接到 Glue 数据目录,您将能够在 Athena 中看到 tables。

然后您可以正常查询这些 table。

有关更多信息,请参阅 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html

如果您想使用 Athena 的 data,@Kirk Broadhurst 的回复是正确的。 如果您想使用 Athena 引擎,那么 github 上有一个库可以解决 preparedStatement 问题。

请注意,由于我缺乏使用 Maven 等的经验,我没有成功使用该库

您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC

<dependency>
    <groupId>com.syncron.amazonaws</groupId>
    <artifactId>simba-athena-jdbc-driver</artifactId>
    <version>2.0.2</version>
</dependency>

使用:

SparkSession spark = SparkSession
    .builder()
    .appName("My Spark Example")
    .getOrCreate();

Class.forName("com.simba.athena.jdbc.Driver");

Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass", 
    "com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");

List<String> predicateList =
    Stream
        .of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
        .collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);

Dataset<Row> data =
    spark.read()
        .jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
            "my_env.my_table", predicates, connectionProperties);

您也可以在 Flink 应用程序中使用此驱动程序:

TypeInformation[] fieldTypes = new TypeInformation[] {
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
};

RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.simba.athena.jdbc.Driver")
    .setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
    .setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
    .setRowTypeInfo(rowTypeInfo)
    .finish();

DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);

其实你可以使用 B2W 的 Spark Athena Driver。

https://github.com/B2W-BIT/athena-spark-driver