使用 Spark 和本地 AWS Glue 实现从 Kinesis -> RDS 移动数据

Moving data from Kinesis -> RDS using Spark with AWS Glue implementation locally

我在本地有一个带有 AWS Glue 实施的 Spark 项目 运行。

我听 Kinesis 流,所以当数据以 JSON 格式到达时,我可以正确存储到 S3。 我想存储在 AWS RDS 中而不是存储在 S3 中。

我试过使用:

dataFrame.write
          .format("jdbc")
          .option("url","jdbc:mysql://aurora.cluster.region.rds.amazonaws.com:3306/database")
          .option("user","user")
          .option("password","password")
          .option("dbtable","test-table")
          .option("driver","com.mysql.jdbc.Driver")
          .save()

Spark 项目使用 AWS 胶水作业从 Kinesis 流中获取数据。

我想将数据添加到 Aurora 数据库。

它因错误而失败

Caused by: java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL
 server version for the right syntax to use near '-glue-table (`label2` TEXT , `customerid` TEXT , `sales` TEXT , `name` TEXT )' a
t line 1

这是我使用的测试数据帧,dataFrame.show():

+------+----------+-----+--------------------+
|label2|customerid|sales|                name|
+------+----------+-----+--------------------+
| test6|      test| test|streamingtesttest...|
+------+----------+-----+--------------------+

Using Spark DynamicFrame instead of DataFrame and using the glueContext sink to publish to Aurora:

所以最终的代码可能是:

lazy val mysqlJsonOption = jsonOptions(MYSQL_AURORA_URI)

//Write to Aurora
val dynamicFrame = DynamicFrame(joined, glueContext)
glueContext.getSink("mysql", mysqlJsonOption).writeDynamicFrame(dynamicFrame)