Pyspark 根据条件添加行
Pyspark add row based on a condition
我有以下数据帧结构
A
B
C
1
open
01.01.22 10:05:04
1
In-process
01.01.22 10:07:02
我需要在打开值之前插入一行row.So,我需要检查它的状态是否打开然后在它之前添加一个新行与其他除了要减去 1 小时的 C 列外,列的值相同。如何使用 Pyspark 实现这一目标?
与其“插入一行”——这是一个需要解决的non-trivial问题——,不如将其视为“联合数据集”
假设这是您的数据集
df = spark.createDataFrame([
(1, 'open', '01.01.22 10:05:04'),
(1, 'In process', '01.01.22 10:07:02'),
], ['a', 'b', 'c'])
+---+----------+-----------------+
| a| b| c|
+---+----------+-----------------+
| 1| open|01.01.22 10:05:04|
| 1|In process|01.01.22 10:07:02|
+---+----------+-----------------+
根据你的规则,我们可以构建另一个这样的数据集
from pyspark.sql import functions as F
df_new = (df
.where(F.col('b') == 'open')
.withColumn('b', F.lit('Before open'))
.withColumn('c', F.to_timestamp('c', 'dd.MM.yy HH:mm:ss')) # convert text to date with custom date format
.withColumn('c', F.col('c') - F.expr('interval 1 hour')) # subtract 1 hour
.withColumn('c', F.from_unixtime(F.unix_timestamp('c'), 'dd.MM.yy HH:mm:ss')) # revert to custom date format
)
+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
+---+-----------+-----------------+
现在你只需要将它们合并在一起,如果你想“看到”它就排序
(df
.union(df_new)
.orderBy('a', 'c')
.show()
)
+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
| 1| open|01.01.22 10:05:04|
| 1| In process|01.01.22 10:07:02|
+---+-----------+-----------------+
我有以下数据帧结构
A | B | C |
---|---|---|
1 | open | 01.01.22 10:05:04 |
1 | In-process | 01.01.22 10:07:02 |
我需要在打开值之前插入一行row.So,我需要检查它的状态是否打开然后在它之前添加一个新行与其他除了要减去 1 小时的 C 列外,列的值相同。如何使用 Pyspark 实现这一目标?
与其“插入一行”——这是一个需要解决的non-trivial问题——,不如将其视为“联合数据集”
假设这是您的数据集
df = spark.createDataFrame([
(1, 'open', '01.01.22 10:05:04'),
(1, 'In process', '01.01.22 10:07:02'),
], ['a', 'b', 'c'])
+---+----------+-----------------+
| a| b| c|
+---+----------+-----------------+
| 1| open|01.01.22 10:05:04|
| 1|In process|01.01.22 10:07:02|
+---+----------+-----------------+
根据你的规则,我们可以构建另一个这样的数据集
from pyspark.sql import functions as F
df_new = (df
.where(F.col('b') == 'open')
.withColumn('b', F.lit('Before open'))
.withColumn('c', F.to_timestamp('c', 'dd.MM.yy HH:mm:ss')) # convert text to date with custom date format
.withColumn('c', F.col('c') - F.expr('interval 1 hour')) # subtract 1 hour
.withColumn('c', F.from_unixtime(F.unix_timestamp('c'), 'dd.MM.yy HH:mm:ss')) # revert to custom date format
)
+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
+---+-----------+-----------------+
现在你只需要将它们合并在一起,如果你想“看到”它就排序
(df
.union(df_new)
.orderBy('a', 'c')
.show()
)
+---+-----------+-----------------+
| a| b| c|
+---+-----------+-----------------+
| 1|Before open|01.01.22 09:05:04|
| 1| open|01.01.22 10:05:04|
| 1| In process|01.01.22 10:07:02|
+---+-----------+-----------------+