尝试使用 Kafka 和 pyspark 从 postgreSQL 中的 spark 编写流式数据帧
Trying to write a streaming dataframe from spark in postgreSQL with Kafka and pyspark
我一直在这个网站的各个方面搜索这个问题,但我没有找到任何解决方案。
我写了一个 java class 在 Kafka 中创建一个生产者并发送一些文件并且它工作正常。
然后,我想编写一个 python 脚本来读取这些文件并将它们放入 postgreSQL 的数据库中。
每个文件(每个文件都是一个有很多列的数据集)成为kafka消费者中的一个主题,文件的每一行成为相关主题中的一条消息。
这是我在 python 中根据流数据创建的 spark 数据帧:
list = df.select("fileName", "Satellite_PRN_number", "date", "time", "Crs", "Delta_n", "m0", "Cuc",
"e_Eccentricity",
"Cus",
"sqrt_A", "Toe_Time_of_Ephemeris", "Cic", "OMEGA_maiusc", "cis", "i0", "Crc", "omega",
"omega_dot",
"idot")
这是我的 python 函数,它应该在我的 postgreSQL table 中插入每一行。我使用 psycopg2 在 python 和 postgre 之间创建连接,并使用“self.cursor.execute”来编写查询。
def process_row(self, row):
self.cursor.execute(
'INSERT INTO satellite(fileName,Satellite_PRN_number, date, time,Crs,Delta_n, m0,
Cuc,e_Eccentricity,Cus,'
'sqrt_A, Toe_Time_of_Ephemeris, Cic, OMEGA_maiusc, cis, i0, Crc, omega, omega_dot, idot) VALUES
(%s,%s,%s,'
'%s,%s,%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(row.fileName, row.Satellite_PRN_number, row.date, row.time, row.Crs, row.Delta_n, row.m0, row.Cuc,
row.e_Eccentricity,
row.Cus, row.sqrt_A, row.Toe_Time_of_Ephemeris, row.Cic, row.OMEGA_maiusc, row.cis, row.i0,
row.Crc,
row.omega,
row.omega_dot, row.idot))
self.connection.commit()
最后,我使用上面的方法通过以下命令在 postgreSQL 中填充我的 table:
query = list.writeStream.outputMode("append").foreachBatch(process_row)\
.option("checkpointLocation", "C:\Users\Admin\AppData\Local\Temp").start()
我收到以下错误:AttributeError: 'DataFrame' object has no attribute 'cursor'
。
我认为问题出在 row.fileName 等...或方法“process_row”中。我不完全明白如何管理方法“process_row”以便传递流数据帧的每一行来填充 posteSQL table.
谁能帮帮我?谢谢
您的 foreachBatch 签名似乎不正确。应该是这样的:
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
如您所见,forEachBatch 函数的第一个参数是一个 DataFrame,而不是您期望的 psycopg2 实例 class。
ForEachBatch 将有一个 DataFrame,它本身将包含当前微批次中的所有行,而不仅仅是一行。
所以您可以尝试在该函数中声明您的 postgreSQL 连接的实例以进一步使用它,或者您可以尝试这种方法:
我会像这样创建一个基于 table postgreSQL 数据库的配置单元 jdbc 源:
CREATE TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
这将使您能够像这样使用 forEachBatch 函数:
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
df.write.insertInto("jdbcTable")
希望对您有所帮助
我一直在这个网站的各个方面搜索这个问题,但我没有找到任何解决方案。 我写了一个 java class 在 Kafka 中创建一个生产者并发送一些文件并且它工作正常。 然后,我想编写一个 python 脚本来读取这些文件并将它们放入 postgreSQL 的数据库中。
每个文件(每个文件都是一个有很多列的数据集)成为kafka消费者中的一个主题,文件的每一行成为相关主题中的一条消息。
这是我在 python 中根据流数据创建的 spark 数据帧:
list = df.select("fileName", "Satellite_PRN_number", "date", "time", "Crs", "Delta_n", "m0", "Cuc",
"e_Eccentricity",
"Cus",
"sqrt_A", "Toe_Time_of_Ephemeris", "Cic", "OMEGA_maiusc", "cis", "i0", "Crc", "omega",
"omega_dot",
"idot")
这是我的 python 函数,它应该在我的 postgreSQL table 中插入每一行。我使用 psycopg2 在 python 和 postgre 之间创建连接,并使用“self.cursor.execute”来编写查询。
def process_row(self, row):
self.cursor.execute(
'INSERT INTO satellite(fileName,Satellite_PRN_number, date, time,Crs,Delta_n, m0,
Cuc,e_Eccentricity,Cus,'
'sqrt_A, Toe_Time_of_Ephemeris, Cic, OMEGA_maiusc, cis, i0, Crc, omega, omega_dot, idot) VALUES
(%s,%s,%s,'
'%s,%s,%s, %s, %s, %s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(row.fileName, row.Satellite_PRN_number, row.date, row.time, row.Crs, row.Delta_n, row.m0, row.Cuc,
row.e_Eccentricity,
row.Cus, row.sqrt_A, row.Toe_Time_of_Ephemeris, row.Cic, row.OMEGA_maiusc, row.cis, row.i0,
row.Crc,
row.omega,
row.omega_dot, row.idot))
self.connection.commit()
最后,我使用上面的方法通过以下命令在 postgreSQL 中填充我的 table:
query = list.writeStream.outputMode("append").foreachBatch(process_row)\
.option("checkpointLocation", "C:\Users\Admin\AppData\Local\Temp").start()
我收到以下错误:AttributeError: 'DataFrame' object has no attribute 'cursor'
。
我认为问题出在 row.fileName 等...或方法“process_row”中。我不完全明白如何管理方法“process_row”以便传递流数据帧的每一行来填充 posteSQL table.
谁能帮帮我?谢谢
您的 foreachBatch 签名似乎不正确。应该是这样的:
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
如您所见,forEachBatch 函数的第一个参数是一个 DataFrame,而不是您期望的 psycopg2 实例 class。 ForEachBatch 将有一个 DataFrame,它本身将包含当前微批次中的所有行,而不仅仅是一行。
所以您可以尝试在该函数中声明您的 postgreSQL 连接的实例以进一步使用它,或者您可以尝试这种方法:
我会像这样创建一个基于 table postgreSQL 数据库的配置单元 jdbc 源:
CREATE TABLE jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
这将使您能够像这样使用 forEachBatch 函数:
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
df.write.insertInto("jdbcTable")
希望对您有所帮助