使用 Spark 和本地 AWS Glue 实现从 Kinesis -> RDS 移动数据
Moving data from Kinesis -> RDS using Spark with AWS Glue implementation locally
我在本地有一个带有 AWS Glue 实施的 Spark 项目 运行。
我听 Kinesis 流,所以当数据以 JSON 格式到达时,我可以正确存储到 S3。
我想存储在 AWS RDS 中而不是存储在 S3 中。
我试过使用:
dataFrame.write
.format("jdbc")
.option("url","jdbc:mysql://aurora.cluster.region.rds.amazonaws.com:3306/database")
.option("user","user")
.option("password","password")
.option("dbtable","test-table")
.option("driver","com.mysql.jdbc.Driver")
.save()
Spark 项目使用 AWS 胶水作业从 Kinesis 流中获取数据。
我想将数据添加到 Aurora 数据库。
它因错误而失败
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL
server version for the right syntax to use near '-glue-table (`label2` TEXT , `customerid` TEXT , `sales` TEXT , `name` TEXT )' a
t line 1
这是我使用的测试数据帧,dataFrame.show()
:
+------+----------+-----+--------------------+
|label2|customerid|sales| name|
+------+----------+-----+--------------------+
| test6| test| test|streamingtesttest...|
+------+----------+-----+--------------------+
Using Spark DynamicFrame instead of DataFrame and using the glueContext sink to publish to Aurora:
所以最终的代码可能是:
lazy val mysqlJsonOption = jsonOptions(MYSQL_AURORA_URI)
//Write to Aurora
val dynamicFrame = DynamicFrame(joined, glueContext)
glueContext.getSink("mysql", mysqlJsonOption).writeDynamicFrame(dynamicFrame)
我在本地有一个带有 AWS Glue 实施的 Spark 项目 运行。
我听 Kinesis 流,所以当数据以 JSON 格式到达时,我可以正确存储到 S3。 我想存储在 AWS RDS 中而不是存储在 S3 中。
我试过使用:
dataFrame.write
.format("jdbc")
.option("url","jdbc:mysql://aurora.cluster.region.rds.amazonaws.com:3306/database")
.option("user","user")
.option("password","password")
.option("dbtable","test-table")
.option("driver","com.mysql.jdbc.Driver")
.save()
Spark 项目使用 AWS 胶水作业从 Kinesis 流中获取数据。
我想将数据添加到 Aurora 数据库。
它因错误而失败
Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL
server version for the right syntax to use near '-glue-table (`label2` TEXT , `customerid` TEXT , `sales` TEXT , `name` TEXT )' a
t line 1
这是我使用的测试数据帧,dataFrame.show()
:
+------+----------+-----+--------------------+
|label2|customerid|sales| name|
+------+----------+-----+--------------------+
| test6| test| test|streamingtesttest...|
+------+----------+-----+--------------------+
Using Spark DynamicFrame instead of DataFrame and using the glueContext sink to publish to Aurora:
所以最终的代码可能是:
lazy val mysqlJsonOption = jsonOptions(MYSQL_AURORA_URI)
//Write to Aurora
val dynamicFrame = DynamicFrame(joined, glueContext)
glueContext.getSink("mysql", mysqlJsonOption).writeDynamicFrame(dynamicFrame)