火花雅典娜连接器
spark Athena connector
我需要在 spark 中使用 Athena,但 spark 在使用 JDBC 驱动程序时使用 preparedStatement,它给了我一个例外
"com.amazonaws.athena.jdbc.NotImplementedException: Method Connection.prepareStatement is not yet implemented"
你能告诉我如何在 spark 中连接 Athena
您不能直接将 Spark 连接到 Athena。 Athena 只是针对 s3 的 Prestodb 的一个实现。与 Presto 不同,Athena 无法将数据定位到 HDFS。
不过,如果你想用Spark查询s3中的数据,那你就走运了HUE, which will let you query data in s3 from Spark on Elastic Map Reduce (EMR)。
另请参阅:
Developer Guide for Hadoop User Experience (HUE) on EMR.
我不知道您如何从 Spark 连接到 Athena,但您不需要 - 您可以非常轻松地查询 Athena 包含的数据(或者更准确地说,"registers")来自 Spark.
雅典娜有两个部分
- Hive Metastore(现在称为 Glue 数据目录)包含数据库和 table 名称以及所有基础文件之间的映射
- Presto 查询引擎,可将您的 SQL 转换为针对这些文件的数据操作
当您启动 EMR 集群(v5.8.0 及更高版本)时,您可以指示它连接到您的 Glue 数据目录。这是 'create cluster' 对话框中的一个复选框。当您选中此选项时,您的 Spark SqlContext
将连接到 Glue 数据目录,您将能够在 Athena 中看到 tables。
然后您可以正常查询这些 table。
有关更多信息,请参阅 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html。
如果您想使用 Athena 的 data,@Kirk Broadhurst 的回复是正确的。
如果您想使用 Athena 引擎,那么 github 上有一个库可以解决 preparedStatement
问题。
请注意,由于我缺乏使用 Maven 等的经验,我没有成功使用该库
您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC
<dependency>
<groupId>com.syncron.amazonaws</groupId>
<artifactId>simba-athena-jdbc-driver</artifactId>
<version>2.0.2</version>
</dependency>
使用:
SparkSession spark = SparkSession
.builder()
.appName("My Spark Example")
.getOrCreate();
Class.forName("com.simba.athena.jdbc.Driver");
Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass",
"com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");
List<String> predicateList =
Stream
.of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);
Dataset<Row> data =
spark.read()
.jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
"my_env.my_table", predicates, connectionProperties);
您也可以在 Flink 应用程序中使用此驱动程序:
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.simba.athena.jdbc.Driver")
.setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
.setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.setRowTypeInfo(rowTypeInfo)
.finish();
DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
其实你可以使用 B2W 的 Spark Athena Driver。
我需要在 spark 中使用 Athena,但 spark 在使用 JDBC 驱动程序时使用 preparedStatement,它给了我一个例外 "com.amazonaws.athena.jdbc.NotImplementedException: Method Connection.prepareStatement is not yet implemented"
你能告诉我如何在 spark 中连接 Athena
您不能直接将 Spark 连接到 Athena。 Athena 只是针对 s3 的 Prestodb 的一个实现。与 Presto 不同,Athena 无法将数据定位到 HDFS。
不过,如果你想用Spark查询s3中的数据,那你就走运了HUE, which will let you query data in s3 from Spark on Elastic Map Reduce (EMR)。
另请参阅: Developer Guide for Hadoop User Experience (HUE) on EMR.
我不知道您如何从 Spark 连接到 Athena,但您不需要 - 您可以非常轻松地查询 Athena 包含的数据(或者更准确地说,"registers")来自 Spark.
雅典娜有两个部分
- Hive Metastore(现在称为 Glue 数据目录)包含数据库和 table 名称以及所有基础文件之间的映射
- Presto 查询引擎,可将您的 SQL 转换为针对这些文件的数据操作
当您启动 EMR 集群(v5.8.0 及更高版本)时,您可以指示它连接到您的 Glue 数据目录。这是 'create cluster' 对话框中的一个复选框。当您选中此选项时,您的 Spark SqlContext
将连接到 Glue 数据目录,您将能够在 Athena 中看到 tables。
然后您可以正常查询这些 table。
有关更多信息,请参阅 https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html。
如果您想使用 Athena 的 data,@Kirk Broadhurst 的回复是正确的。
如果您想使用 Athena 引擎,那么 github 上有一个库可以解决 preparedStatement
问题。
请注意,由于我缺乏使用 Maven 等的经验,我没有成功使用该库
您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC
<dependency>
<groupId>com.syncron.amazonaws</groupId>
<artifactId>simba-athena-jdbc-driver</artifactId>
<version>2.0.2</version>
</dependency>
使用:
SparkSession spark = SparkSession
.builder()
.appName("My Spark Example")
.getOrCreate();
Class.forName("com.simba.athena.jdbc.Driver");
Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass",
"com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");
List<String> predicateList =
Stream
.of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);
Dataset<Row> data =
spark.read()
.jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
"my_env.my_table", predicates, connectionProperties);
您也可以在 Flink 应用程序中使用此驱动程序:
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.simba.athena.jdbc.Driver")
.setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
.setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.setRowTypeInfo(rowTypeInfo)
.finish();
DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
其实你可以使用 B2W 的 Spark Athena Driver。