Pyspark - 基于数据框中 2 列的不同记录
Pyspark - distinct records based on 2 columns in dataframe
我有 2 个数据帧,比如 df1
和 df2
。
df1
数据来自数据库,df2
是我从客户那里收到的新数据。我需要对新数据进行处理,根据是新记录还是现有记录进行更新UPSERTs
示例数据输出:
df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])
df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 01| NJ|
| xxx2| 81A01| TERR NAME 01| NJ|
| xxx3| 81A01| TERR NAME 01| NJ|
| xxx4| 81A01| TERR NAME 01| CA|
| xxx5| 81A01| TERR NAME 01| ME|
+---------------------------------------------
# Print out information about this data
df2.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 55| NY|
| xxx2| 81A01| TERR NAME 55| NY|
| x103| 81A01| TERR NAME 01| NJ|
+---------------------------------------------
预期结果:
我需要将 df2 数据帧与 df1 进行比较。
根据以上比较创建 2 个新数据集,即要更新的记录和要附加/插入到数据库的记录。
如果zip_code & territory_code相同,则为更新,否则为更新插入数据库。
例如:
INSERT 的新数据帧输出:
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| x103| 81A01| TERR NAME 01| NJ|
+---------------------------------------------
更新的新数据框:
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 55| NY|
| xxx2| 81A01| TERR NAME 55| NY|
+---------------------------------------------
有人可以帮助我吗?我正在使用 AWS Glue。
更新:解决方案(使用连接和减法)
df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| x103| 81A01| TERR NAME 01| NJ|
+------------+------------------+------------------+---------+
df4=df2.subtract(df5)
df4.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
+------------------------------------------------------------+
对于RDS数据库更新,我使用pymysql/Mysqldb:
db = MySQLdb.connect("xxxx.rds.amazonaws.com", "username", "password", "databasename")
cursor = db.cursor()
#cursor.execute("REPLACE INTO table SELECT * FROM table_stg")
insertQry = "INSERT INTO table VALUES('xxx1','81A01','TERR NAME 55','NY') ON DUPLICATE KEY UPDATE territory_name='TERR NAME 55', state='NY'"
n=cursor.execute(insertQry)
db.commit()
cursor.fetchall()
db.close()
谢谢
这是一个解决方案草图:
将两个帧投影到您的唯一键(zip_code 和领土)
使用 spark 数据帧 api 计算两个数据帧之间的交集和差集。看到这个link:
对键的交集进行更新
插入差异(在新数据框中,而不是在现有数据中)
在 scala 中,这看起来像这样——在 python 中应该非常相似:
import org.apache.spark.sql.SparkSession
case class ZipTerr(zip_code: String, territory_code: String,
territory_name: String, state:String)
case class Key(zip_code: String, territory_code: String)
val spark: SparkSession
val newData = spark.createDataFrame(List(
ZipTerr("xxx1", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx2", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx3", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx4", "81A01", "TERR NAME 01", "CA"),
ZipTerr("xx5","81A01","TERR NAME 01","ME")
))
val oldData = spark.createDataFrame(List(
ZipTerr("xxx1","81A01","TERR NAME 55","NY"),
ZipTerr("xxx2","81A01","TERR NAME 55","NY"),
ZipTerr("x103","81A01","TERR NAME 01","NJ")
))
val newKeys = newData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))
val oldKeys = oldData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))
val keysToInsert = newKeys.except(oldKeys)
val keysToUpdate = newKeys.intersect(oldKeys)
这有帮助吗?
注意:您的变量名称表明您正在使用粘合动态框架。然而,您正在使用 sqlContext.createDataFrame
函数为它们分配普通的 spark 数据帧。
为清楚起见,我在此处使用代码片段重现解决方案:
df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code_new","territory_code_new","territory_name_new","state"])
df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 01| NJ|
| xxx2| 81A01| TERR NAME 01| NJ|
| xxx3| 81A01| TERR NAME 01| NJ|
| xxx4| 81A01| TERR NAME 01| CA|
| xxx5| 81A01| TERR NAME 01| ME|
+---------------------------------------------
# Print out information about this data
df2.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
| x103 | 81A01 | TERR NAME 01 | NJ |
+------------------------------------------------------------+
获取新记录,可以使用“附加”操作将其插入到mysql
df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| x103| 81A01| TERR NAME 01| NJ|
+------------+------------------+------------------+---------+
然后获取剩余需要更新的记录到mysql数据库中。如果纯粹需要 python,我们可以使用 arr = df1.collect()
,然后是 for r in arr:
,否则,我们可以使用 pandas 迭代器处理每个记录。
df4=df2.subtract(df5)
df4.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
+------------------------------------------------------------+
希望这对有需要的人有所帮助。如果在上述场景中有更好的数据帧迭代方法,请告诉我。谢谢
我有 2 个数据帧,比如 df1
和 df2
。
df1
数据来自数据库,df2
是我从客户那里收到的新数据。我需要对新数据进行处理,根据是新记录还是现有记录进行更新UPSERTs
示例数据输出:
df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])
df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 01| NJ|
| xxx2| 81A01| TERR NAME 01| NJ|
| xxx3| 81A01| TERR NAME 01| NJ|
| xxx4| 81A01| TERR NAME 01| CA|
| xxx5| 81A01| TERR NAME 01| ME|
+---------------------------------------------
# Print out information about this data
df2.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 55| NY|
| xxx2| 81A01| TERR NAME 55| NY|
| x103| 81A01| TERR NAME 01| NJ|
+---------------------------------------------
预期结果: 我需要将 df2 数据帧与 df1 进行比较。 根据以上比较创建 2 个新数据集,即要更新的记录和要附加/插入到数据库的记录。
如果zip_code & territory_code相同,则为更新,否则为更新插入数据库。
例如: INSERT 的新数据帧输出:
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| x103| 81A01| TERR NAME 01| NJ|
+---------------------------------------------
更新的新数据框:
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 55| NY|
| xxx2| 81A01| TERR NAME 55| NY|
+---------------------------------------------
有人可以帮助我吗?我正在使用 AWS Glue。
更新:解决方案(使用连接和减法)
df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| x103| 81A01| TERR NAME 01| NJ|
+------------+------------------+------------------+---------+
df4=df2.subtract(df5)
df4.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
+------------------------------------------------------------+
对于RDS数据库更新,我使用pymysql/Mysqldb:
db = MySQLdb.connect("xxxx.rds.amazonaws.com", "username", "password", "databasename")
cursor = db.cursor()
#cursor.execute("REPLACE INTO table SELECT * FROM table_stg")
insertQry = "INSERT INTO table VALUES('xxx1','81A01','TERR NAME 55','NY') ON DUPLICATE KEY UPDATE territory_name='TERR NAME 55', state='NY'"
n=cursor.execute(insertQry)
db.commit()
cursor.fetchall()
db.close()
谢谢
这是一个解决方案草图:
将两个帧投影到您的唯一键(zip_code 和领土)
使用 spark 数据帧 api 计算两个数据帧之间的交集和差集。看到这个link:
对键的交集进行更新
插入差异(在新数据框中,而不是在现有数据中)
在 scala 中,这看起来像这样——在 python 中应该非常相似:
import org.apache.spark.sql.SparkSession
case class ZipTerr(zip_code: String, territory_code: String,
territory_name: String, state:String)
case class Key(zip_code: String, territory_code: String)
val spark: SparkSession
val newData = spark.createDataFrame(List(
ZipTerr("xxx1", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx2", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx3", "81A01", "TERR NAME 01", "NJ"),
ZipTerr("xxx4", "81A01", "TERR NAME 01", "CA"),
ZipTerr("xx5","81A01","TERR NAME 01","ME")
))
val oldData = spark.createDataFrame(List(
ZipTerr("xxx1","81A01","TERR NAME 55","NY"),
ZipTerr("xxx2","81A01","TERR NAME 55","NY"),
ZipTerr("x103","81A01","TERR NAME 01","NJ")
))
val newKeys = newData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))
val oldKeys = oldData.map(z => Key(z.getAs("zip_code"), z.getAs("territory_code")))
val keysToInsert = newKeys.except(oldKeys)
val keysToUpdate = newKeys.intersect(oldKeys)
这有帮助吗?
注意:您的变量名称表明您正在使用粘合动态框架。然而,您正在使用 sqlContext.createDataFrame
函数为它们分配普通的 spark 数据帧。
为清楚起见,我在此处使用代码片段重现解决方案:
df1= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 01","NJ"),("xxx2","81A01","TERR NAME 01","NJ"),("xxx3","81A01","TERR NAME 01","NJ"),("xxx4","81A01","TERR NAME 01","CA"),("xx5","81A01","TERR NAME 01","ME")], ["zip_code","territory_code","territory_name","state"])
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code_new","territory_code_new","territory_name_new","state"])
df1.show()
+--------+--------------+--------------+-----+
|zip_code|territory_code|territory_name|state|
+--------+--------------+--------------+-----+
| xxx1| 81A01| TERR NAME 01| NJ|
| xxx2| 81A01| TERR NAME 01| NJ|
| xxx3| 81A01| TERR NAME 01| NJ|
| xxx4| 81A01| TERR NAME 01| CA|
| xxx5| 81A01| TERR NAME 01| ME|
+---------------------------------------------
# Print out information about this data
df2.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
| x103 | 81A01 | TERR NAME 01 | NJ |
+------------------------------------------------------------+
获取新记录,可以使用“附加”操作将其插入到mysql
df3 = df1.join(df2, (df1.zip_code == df2.zip_code_new) & (df1.territory_code == df2.territory_code_new))
df5=df3.drop("zip_code", "territory_code", "territory_name", "state")
df5.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| x103| 81A01| TERR NAME 01| NJ|
+------------+------------------+------------------+---------+
然后获取剩余需要更新的记录到mysql数据库中。如果纯粹需要 python,我们可以使用 arr = df1.collect()
,然后是 for r in arr:
,否则,我们可以使用 pandas 迭代器处理每个记录。
df4=df2.subtract(df5)
df4.show()
+------------+------------------+------------------+---------+
|zip_code_new|territory_code_new|territory_name_new|state_new|
+------------+------------------+------------------+---------+
| xxx1 | 81A01 | TERR NAME 55 | NY |
| xxx2 | 81A01 | TERR NAME 55 | NY |
+------------------------------------------------------------+
希望这对有需要的人有所帮助。如果在上述场景中有更好的数据帧迭代方法,请告诉我。谢谢