AWS glueContext 读取不允许 sql 查询

AWS glueContext read doesn't allow a sql query

我想使用 AWS glue 作业从 Mysql 实例中读取过滤后的数据。由于胶水 jdbc 连接不允许我下推谓词,因此我试图在我的代码中显式创建一个 jdbc 连接。

我想 运行 select 使用 jdbc 连接对 Mysql 数据库进行 select 查询,如下所示

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession


object TryMe {

  def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(sc)
    val spark: SparkSession = glueContext.getSparkSession

    // Read data into a DynamicFrame using the Data Catalog metadata
    val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","select * from table1 where 1=1").option("driver","com.mysql.jdbc.Driver").load()

  }
}

它因错误而失败

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'select * from table1 where 1=1 WHERE 1=0' at line 1

这行不通吗?如何在不将整个 table 读入数据帧的情况下使用 JDBC 连接检索过滤后的数据?

我认为问题的发生是因为您没有使用括号中的查询并提供别名。在我看来,它应该类似于以下示例:

 val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","(select * from table1 where 1=1) as t1").option("driver","com.mysql.jdbc.Driver").load()

有关 SQL 数据源中参数的更多信息:

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

说到Glue和Glue提供的框架,也有选项"push_down_predicate",但我只在基于S3的数据源上使用过这个选项。我认为它不适用于 S3 和非分区数据以外的其他来源。

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

为什么不使用 create_dynamic_frame_from_options?

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_options

对于仍在进一步搜索 answers/examples 的任何人,我可以确认 push_down_predicate 选项适用于 ODBC 数据源。这是我从 SQL 服务器(在 Python)中读取的方式。

df = glueContext.read.format("jdbc")
    .option("url","jdbc:sqlserver://server-ip:port;databaseName=db;")
    .option("user","username")
    .option("password","password")
    .option("dbtable","(select t1.*, t2.name from dbo.table1 t1 join dbo.table2 t2 on t1.id = t2.id) as users")
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()

这也有效,但与我预期的不同。谓词未下推到数据源。

df = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "db_dbo_table1", push_down_predicate = "(id >= 2850700 AND statusCode = 'ACT')")

pushDownPredicate 上的文档指出:启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。

这是我尝试用于性能比较的 5 个不同的代码片段,只有 2 个在使用分析器时在服务器级别实际过滤了数据,目前似乎没有创建自定义连接器或从市场购买的唯一方法这个工作正在使用 glueContext.read

你可以convert DynamicFrames to and from DataFrames (See example)

rds_datasink_temp = DynamicFrame.fromDF(rds_dataframe, glueContext, "nested")

你还应该在 运行 Sql Server Profiler 中检查这个,其中包含来自以下所有事件:OLEDB存储过程TSQL事务

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.context import DynamicFrame

# list parameter with 2 leading hyphens --param_server_url 
args = getResolvedOptions(sys.argv,['JOB_NAME'])
print("JOB_NAME: ", args['JOB_NAME'])

job_server_url="SERVER URL"
job_db_name="DB NAME"
job_db_user="DB USER"
job_db_password="DB PASSWORD"
job_table_name="TABLE NAME"

job_glue_db_name="GLUE DATA CATALOG DATABASE NAME"
job_glue_conn_name="GLUE DATA CATALOG CONNECTION NAME"
job_glue_table_name="GLUE DATA CATALOG TABLE NAME"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
region = "us-east-1"

#### aws glue data catalog table info (from ) ####
# Name  job_glue_table_name
# Database  job_glue_db_name
# Classification    sqlserver
# Location  job_db_name.dbo.job_table_name
# Connection    job_glue_conn_name

#### GlueContext Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

#### DynamicFrame Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

#### Connection Api ####
# https://docs.aws.amazon.com/glue/latest/webapi/API_Connection.html

#### Using connectors and connections with AWS Glue Studio ####
# Link : https://docs.aws.amazon.com/glue/latest/ug/connectors-chapter.html
# Use AWS Secrets Manager for storing credentials
# Filtering the source data with row predicates and column projections 

#### Connection options for type custom.jdbc or marketplace.jdbc ####
# Link : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-jdbc
# className – String, required, driver class name.
# connectionName – String, required, name of the connection that is associated with the connector.
# secretId or user/password – String, required, used to retrieve credentials for the URL.
# dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.
# filterPredicate – String, optional, extra condition clause to filter data from source. For example:

# using \ for new line with more commands
# query="recordid<=5", -- filtering !
print("0001 - df_read_query")
df_read_query = glueContext.read \
    .format("jdbc") \
    .option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
    .option("query","select recordid from "+job_table_name+" where recordid <= 5") \
    .option("user",job_db_user) \
    .option("password",job_db_password) \
    .load()
print("df_read_query count: ", df_read_query.count())
df_read_query.show(10)
df_read_query.printSchema()

# query="recordid<=5", -- not filtering
print("0002 - df_from_catalog_query")
df_from_catalog_query = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_glue_table_name, 
    additional_options={
        "query":"select recordid from "+job_table_name+" where recordid <= 5;",
    },
    transformation_ctx = "df_from_catalog_query", 
)
print("df_from_catalog_query count: ", df_from_catalog_query.count())
df_from_catalog_query.show(10)

# push_down_predicate="recordid<=5", -- not filtering
print("0003 - df_from_catalog_push_down_predicate")
df_from_catalog_push_down_predicate = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_db_name+'_dbo_'+job_table_name, 
    push_down_predicate = "recordid<=5",
    transformation_ctx = "df_from_catalog_push_down_predicate",
)
print("df_from_catalog_push_down_predicate count: ", df_from_catalog_push_down_predicate.count())
df_from_catalog_push_down_predicate.show(10)

# filterPredicate="recordid<=5", -- not filtering
print("0004 - df_from_options_sqlserver")
df_from_options_sqlserver = glueContext.create_dynamic_frame.from_options(
    connection_type = "sqlserver", 
    connection_options = {
        "url":"jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";",
        "username":job_db_user,
        "password":job_db_password,
        "location":job_db_name+".dbo."+job_table_name,
        "filterPredicate":"recordid<=5",
    }, 
    transformation_ctx = "df_from_options_sqlserver",
)
print("df_from_options_sqlserver count: ", df_from_options_sqlserver.count())
df_from_options_sqlserver.show(10)

# dbtable="recordid<=5", -- filtering !
print("0005 - df_read_dbtable")
df_read_dbtable = glueContext.read \
    .format("jdbc") \
    .option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
    .option("user",job_db_user) \
    .option("password",job_db_password) \
    .option("dbtable","(select recordid from "+job_table_name+" where recordid<=5) as t1") \
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
print("df_read_dbtable count: ", df_read_dbtable.count())
df_read_dbtable.show(10)

job.commit()