使用 AWS Glue 覆盖 MySQL 个表
Overwrite MySQL tables with AWS Glue
我有一个 lambda 进程,它偶尔会轮询 API 以获取最近的数据。此数据具有唯一键,我想使用 Glue 更新 MySQL 中的 table。是否有使用此密钥覆盖数据的选项? (类似于 Spark 的 mode=overwrite)。如果没有 - 我可以在插入所有新数据之前截断 Glue 中的 table 吗?
谢谢
我 运行 遇到了与 Redshift 相同的问题,我们能想到的最佳解决方案是创建一个 Java class 来加载 MySQL 驱动程序并发出截断 table:
package com.my.glue.utils.mysql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@SuppressWarnings("unused")
public class MySQLTruncateClient {
public void truncate(String tableName, String url) throws SQLException, ClassNotFoundException {
Class.forName("com.mysql.jdbc.Driver");
try (Connection mysqlConnection = DriverManager.getConnection(url);
Statement statement = mysqlConnection.createStatement()) {
statement.execute(String.format("TRUNCATE TABLE %s", tableName));
}
}
}
将该 JAR 连同您的 MySQL Jar 依赖项一起上传到 S3,并使您的工作依赖于这些。在您的 PySpark 脚本中,您可以加载截断方法:
java_import(glue_context._jvm, "com.my.glue.utils.mysql.MySQLTruncateClient")
truncate_client = glue_context._jvm.MySQLTruncateClient()
truncate_client.truncate('my_table', 'jdbc:mysql://...')
我想出的解决方法如下:
- 在 mysql 中创建暂存 table,并将新数据加载到此 table。
- 运行 命令:
REPLACE INTO myTable SELECT * FROM myStagingTable;
- 截断暂存 table
这可以通过以下方式完成:
import sys from awsglue.transforms
import * from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE")
cursor = db.cursor()
cursor.execute("REPLACE INTO myTable SELECT * FROM myStagingTable")
cursor.fetchall()
db.close()
job.commit()
我发现了一种在 Glue 中使用 JDBC 连接的更简单方法。 Glue 团队建议截断 table 的方法是在您将数据写入 Redshift 集群时通过以下示例代码:
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = resolvechoice4, catalog_connection = "<connection-name>", connection_options = {"dbtable": "<target-table>", "database": "testdb", "preactions":"TRUNCATE TABLE <table-name>"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
哪里
connection-name your Glue connection name to your Redshift Cluster
target-table the table you're loading the data in
testdb name of the database
table-name name of the table to truncate (ideally the table you're loading into)
我有一个 lambda 进程,它偶尔会轮询 API 以获取最近的数据。此数据具有唯一键,我想使用 Glue 更新 MySQL 中的 table。是否有使用此密钥覆盖数据的选项? (类似于 Spark 的 mode=overwrite)。如果没有 - 我可以在插入所有新数据之前截断 Glue 中的 table 吗?
谢谢
我 运行 遇到了与 Redshift 相同的问题,我们能想到的最佳解决方案是创建一个 Java class 来加载 MySQL 驱动程序并发出截断 table:
package com.my.glue.utils.mysql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@SuppressWarnings("unused")
public class MySQLTruncateClient {
public void truncate(String tableName, String url) throws SQLException, ClassNotFoundException {
Class.forName("com.mysql.jdbc.Driver");
try (Connection mysqlConnection = DriverManager.getConnection(url);
Statement statement = mysqlConnection.createStatement()) {
statement.execute(String.format("TRUNCATE TABLE %s", tableName));
}
}
}
将该 JAR 连同您的 MySQL Jar 依赖项一起上传到 S3,并使您的工作依赖于这些。在您的 PySpark 脚本中,您可以加载截断方法:
java_import(glue_context._jvm, "com.my.glue.utils.mysql.MySQLTruncateClient")
truncate_client = glue_context._jvm.MySQLTruncateClient()
truncate_client.truncate('my_table', 'jdbc:mysql://...')
我想出的解决方法如下:
- 在 mysql 中创建暂存 table,并将新数据加载到此 table。
- 运行 命令:
REPLACE INTO myTable SELECT * FROM myStagingTable;
- 截断暂存 table
这可以通过以下方式完成:
import sys from awsglue.transforms
import * from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
db = MySQLdb.connect("URL", "USERNAME", "PASSWORD", "DATABASE")
cursor = db.cursor()
cursor.execute("REPLACE INTO myTable SELECT * FROM myStagingTable")
cursor.fetchall()
db.close()
job.commit()
我发现了一种在 Glue 中使用 JDBC 连接的更简单方法。 Glue 团队建议截断 table 的方法是在您将数据写入 Redshift 集群时通过以下示例代码:
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = resolvechoice4, catalog_connection = "<connection-name>", connection_options = {"dbtable": "<target-table>", "database": "testdb", "preactions":"TRUNCATE TABLE <table-name>"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
哪里
connection-name your Glue connection name to your Redshift Cluster
target-table the table you're loading the data in
testdb name of the database
table-name name of the table to truncate (ideally the table you're loading into)