无法使用气流 emr 运算符使用 Pyspark 从 EMR 集群连接到 Snowflake
Not able to connect to Snowflake from EMR Cluster using Pyspark using airflow emr operator
我正在尝试从 airflow EMR 操作员启动的 EMR 集群连接到雪花,但出现以下错误
py4j.protocol.Py4JJavaError: An error occurred while calling
o147.load. : java.lang.ClassNotFoundException: Failed to find data
source: net.snowflake.spark.snowflake. Please find packages at
http://spark.apache.org/third-party-projects.html
这些是我添加到我的 EMRaddsteps 运算符到 运行 脚本 load_updates.py
的步骤,我在“Args 中描述我的雪花包“
STEPS = [
{
"Name" : "convo_facts",
"ActionOnFailure" : "TERMINATE_CLUSTER",
"HadoopJarStep" : {
"Jar" : "command-runner.jar",
"Args" : ["spark-submit", "s3://dev-data-lake/spark_files/cf/load_updates.py", \
"--packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4", \
"INPUT=s3://dev-data-lake/table_exports/public/", \
"OUTPUT=s3://dev-data-lake/emr_output/cf/"]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name' : 'cftest',
'LogUri' : 's3://dev-data-lake/emr_logs/cf/log.txt',
'ReleaseLabel' : 'emr-5.32.0',
'Instances' : {
'InstanceGroups' : [
{
'Name' : 'Master nodes',
'Market' : 'ON_DEMAND',
'InstanceRole' : 'MASTER',
'InstanceType' : 'r6g.4xlarge',
'InstanceCount' : 1,
},
{
'Name' : 'Slave nodes',
'Market' : 'ON_DEMAND',
'InstanceRole' : 'CORE',
'InstanceType' : 'r6g.4xlarge',
'InstanceCount' : 3,
}
],
'KeepJobFlowAliveWhenNoSteps' : True,
'TerminationProtected' : False
},
'Applications' : [{
'Name' : 'Spark'
}],
'JobFlowRole' : 'EMR_EC2_DefaultRole',
'ServiceRole' : 'EMR_DefaultRole'
}
而且,这就是我在我的 load_updates.py 脚本中添加 snowflake creds 以提取到 pyspark 数据帧中的方式。
# Set options below
sfOptions = {
"sfURL" : "xxxx.us-east-1.snowflakecomputing.com",
"sfUser" : "user",
"sfPassword" : "xxxx",
"sfDatabase" : "",
"sfSchema" : "PUBLIC",
"sfWarehouse" : ""
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
query_sql = """select * from cf""";
messages_new = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", query_sql) \
.load()
不确定我是否遗漏了什么或者哪里做错了。
选项--package
应该放在spark-submit命令的s3://.../load_updates.py
之前。否则,它将被视为应用程序参数。
试试这个:
STEPS = [
{
"Name": "convo_facts",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4",
"s3://dev-data-lake/spark_files/cf/load_updates.py",
"INPUT=s3://dev-data-lake/table_exports/public/",
"OUTPUT=s3://dev-data-lake/emr_output/cf/"
]
}
}
]
我正在尝试从 airflow EMR 操作员启动的 EMR 集群连接到雪花,但出现以下错误
py4j.protocol.Py4JJavaError: An error occurred while calling o147.load. : java.lang.ClassNotFoundException: Failed to find data source: net.snowflake.spark.snowflake. Please find packages at http://spark.apache.org/third-party-projects.html
这些是我添加到我的 EMRaddsteps 运算符到 运行 脚本 load_updates.py
的步骤,我在“Args 中描述我的雪花包“
STEPS = [
{
"Name" : "convo_facts",
"ActionOnFailure" : "TERMINATE_CLUSTER",
"HadoopJarStep" : {
"Jar" : "command-runner.jar",
"Args" : ["spark-submit", "s3://dev-data-lake/spark_files/cf/load_updates.py", \
"--packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4", \
"INPUT=s3://dev-data-lake/table_exports/public/", \
"OUTPUT=s3://dev-data-lake/emr_output/cf/"]
}
}
]
JOB_FLOW_OVERRIDES = {
'Name' : 'cftest',
'LogUri' : 's3://dev-data-lake/emr_logs/cf/log.txt',
'ReleaseLabel' : 'emr-5.32.0',
'Instances' : {
'InstanceGroups' : [
{
'Name' : 'Master nodes',
'Market' : 'ON_DEMAND',
'InstanceRole' : 'MASTER',
'InstanceType' : 'r6g.4xlarge',
'InstanceCount' : 1,
},
{
'Name' : 'Slave nodes',
'Market' : 'ON_DEMAND',
'InstanceRole' : 'CORE',
'InstanceType' : 'r6g.4xlarge',
'InstanceCount' : 3,
}
],
'KeepJobFlowAliveWhenNoSteps' : True,
'TerminationProtected' : False
},
'Applications' : [{
'Name' : 'Spark'
}],
'JobFlowRole' : 'EMR_EC2_DefaultRole',
'ServiceRole' : 'EMR_DefaultRole'
}
而且,这就是我在我的 load_updates.py 脚本中添加 snowflake creds 以提取到 pyspark 数据帧中的方式。
# Set options below
sfOptions = {
"sfURL" : "xxxx.us-east-1.snowflakecomputing.com",
"sfUser" : "user",
"sfPassword" : "xxxx",
"sfDatabase" : "",
"sfSchema" : "PUBLIC",
"sfWarehouse" : ""
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
query_sql = """select * from cf""";
messages_new = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", query_sql) \
.load()
不确定我是否遗漏了什么或者哪里做错了。
选项--package
应该放在spark-submit命令的s3://.../load_updates.py
之前。否则,它将被视为应用程序参数。
试试这个:
STEPS = [
{
"Name": "convo_facts",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--packages",
"net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4",
"s3://dev-data-lake/spark_files/cf/load_updates.py",
"INPUT=s3://dev-data-lake/table_exports/public/",
"OUTPUT=s3://dev-data-lake/emr_output/cf/"
]
}
}
]