通过在大型 df 中的现有列上应用函数来创建新列时,PySpark 崩溃
PySpark crashes when creating new column by applying function upon existing columns in large df
我从压缩的 10gb .gz 文件创建了以下数据框,格式为 csv:
+-------------------+----------+--------+----+
| tweet_id| date| time|lang|
+-------------------+----------+--------+----+
|1212360731695427584|2020-01-01|13:11:37| en|
|1212470713338286081|2020-01-01|20:28:39| ru|
|1212537749485449216|2020-01-02|00:55:01| ru|
+-------------------+----------+--------+----+
我试图通过将日期和时间字符串列转换为 unix 时间戳来创建一个新列:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime, date
import time
spark = SparkSession.builder.appName("Tweets").getOrCreate()
df = spark.read.csv('tweets.gz', header=True, sep=r'\t')
def tounixtime(date_s, time_s):
if None in (date_s, time_s):
return -1
ymd = tuple([int(x) for x in date_s.split("-")])
t = [int(x) for x in time_s.split(":")]
d = date(*ymd).timetuple()
return int(time.mktime(d) + t[0] * 3600 + t[1] * 60 + t[2])
tounix = udf(tounixtime, IntegerType())
df.withColumn('timestamp', tounix(df.date, df.time)).show()
我得到一个异常,在过程的某个阶段发生错误,并且 python 无法重新连接。我不确定这里有什么问题
在不使用任何函数的情况下,一个简单的转换就可以完成这项工作,因为您的数据非常整洁:
from pyspark.sql import functions as F
df_2 = df.withColumn(
"tmst", F.concat_ws(" ", F.col("date"), F.col("time")).cast("timestamp")
) # or F.concat(F.col("date"), F.lit(" "), F.col("time"))
df_2.show()
+-------------------+----------+--------+----+-------------------+
| tweet_id| date| time|lang| tmst|
+-------------------+----------+--------+----+-------------------+
|1212360731695427584|2020-01-01|13:11:37| en|2020-01-01 13:11:37|
|1212470713338286081|2020-01-01|20:28:39| ru|2020-01-01 20:28:39|
|1212537749485449216|2020-01-02|00:55:01| ru|2020-01-02 00:55:01|
+-------------------+----------+--------+----+-------------------+
df_2.printSchema()
root
|-- tweet_id: long (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- lang: string (nullable = true)
|-- tmst: timestamp (nullable = true)
我从压缩的 10gb .gz 文件创建了以下数据框,格式为 csv:
+-------------------+----------+--------+----+
| tweet_id| date| time|lang|
+-------------------+----------+--------+----+
|1212360731695427584|2020-01-01|13:11:37| en|
|1212470713338286081|2020-01-01|20:28:39| ru|
|1212537749485449216|2020-01-02|00:55:01| ru|
+-------------------+----------+--------+----+
我试图通过将日期和时间字符串列转换为 unix 时间戳来创建一个新列:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime, date
import time
spark = SparkSession.builder.appName("Tweets").getOrCreate()
df = spark.read.csv('tweets.gz', header=True, sep=r'\t')
def tounixtime(date_s, time_s):
if None in (date_s, time_s):
return -1
ymd = tuple([int(x) for x in date_s.split("-")])
t = [int(x) for x in time_s.split(":")]
d = date(*ymd).timetuple()
return int(time.mktime(d) + t[0] * 3600 + t[1] * 60 + t[2])
tounix = udf(tounixtime, IntegerType())
df.withColumn('timestamp', tounix(df.date, df.time)).show()
我得到一个异常,在过程的某个阶段发生错误,并且 python 无法重新连接。我不确定这里有什么问题
在不使用任何函数的情况下,一个简单的转换就可以完成这项工作,因为您的数据非常整洁:
from pyspark.sql import functions as F
df_2 = df.withColumn(
"tmst", F.concat_ws(" ", F.col("date"), F.col("time")).cast("timestamp")
) # or F.concat(F.col("date"), F.lit(" "), F.col("time"))
df_2.show()
+-------------------+----------+--------+----+-------------------+
| tweet_id| date| time|lang| tmst|
+-------------------+----------+--------+----+-------------------+
|1212360731695427584|2020-01-01|13:11:37| en|2020-01-01 13:11:37|
|1212470713338286081|2020-01-01|20:28:39| ru|2020-01-01 20:28:39|
|1212537749485449216|2020-01-02|00:55:01| ru|2020-01-02 00:55:01|
+-------------------+----------+--------+----+-------------------+
df_2.printSchema()
root
|-- tweet_id: long (nullable = true)
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- lang: string (nullable = true)
|-- tmst: timestamp (nullable = true)