Pyspark 错误 self._sock.recv_into(b) socket.timeout:超时
Pyspark with error self._sock.recv_into(b) socket.timeout: timed out
目标是使用 UDF 对行进行分类。我在 windows.
上使用 pyspark
使用过滤器等简单函数或操作似乎可行。
关于如何解决 timeout/socket 故障的任何指示都会有所帮助(请参阅下面的错误)。
数据中没有空值。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,StringType
def BreakDown(arr_value):
start_year = arr_value[0]
start_month = arr_value[1]
end_year = arr_value[2]
end_month = arr_value[3]
curr_year = arr_value[4]
curr_month = arr_value[5]
if (curr_year == start_year) & (curr_month >= start_month) : return 1
elif (curr_year == end_year) & (curr_month <= end_month) : return 1
elif (curr_year > start_year) & (curr_year < end_year) : return 1
else: return 0
udfBreakDown = udf(BreakDown, IntegerType())
temp = temp.withColumn('include', udfBreakDown(F.struct('start_year','start_month','end_year','end_month','curr_year','curr_month')))
PythonException: An exception was thrown from the Python worker.
Please see the stack trace below. Traceback (most recent call last):
File
"E:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 585, in main File
"E:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 593, in read_int
length = stream.read(4) File "C:\ProgramData\Anaconda3\lib\socket.py", line 669, in readinto
return self._sock.recv_into(b) socket.timeout: timed out
当您可以使用 Spark 内置函数时,请始终避免使用 UDF。您可以使用 when
函数重写您的逻辑,如下所示:
from pyspark.sql import functions as F
def get_include_col():
c = F.when((F.col("curr_year") == F.col("start_year")) & (F.col("curr_month") >= F.col("start_month")), F.lit(1)) \
.when((F.col("curr_year") == F.col("end_year")) & (F.col("curr_month") <= F.col("end_month")), F.lit(1)) \
.when((F.col("curr_year") > F.col("start_year")) & (F.col("curr_year") < F.col("end_year")), F.lit(1)) \
.otherwise(F.lit(0))
return c
temp = temp.withColumn('include', get_include_col())
您还可以使用 functools.reduce
动态生成 when 表达式,而无需将它们全部粘贴。例如:
import functools
from pyspark.sql import functions as F
cases = [
("curr_year = start_year and curr_month >= start_month", 1),
("curr_year = end_year and curr_month <= end_month", 1),
("curr_year > start_year and curr_year < end_year", 1)
]
include_col = functools.reduce(
lambda acc, x: acc.when(F.expr(x[0]), F.lit(x[1])),
cases,
F
).otherwise(F.lit(0))
temp = temp.withColumn('include', include_col)
目标是使用 UDF 对行进行分类。我在 windows.
上使用 pyspark使用过滤器等简单函数或操作似乎可行。
关于如何解决 timeout/socket 故障的任何指示都会有所帮助(请参阅下面的错误)。
数据中没有空值。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType,StringType
def BreakDown(arr_value):
start_year = arr_value[0]
start_month = arr_value[1]
end_year = arr_value[2]
end_month = arr_value[3]
curr_year = arr_value[4]
curr_month = arr_value[5]
if (curr_year == start_year) & (curr_month >= start_month) : return 1
elif (curr_year == end_year) & (curr_month <= end_month) : return 1
elif (curr_year > start_year) & (curr_year < end_year) : return 1
else: return 0
udfBreakDown = udf(BreakDown, IntegerType())
temp = temp.withColumn('include', udfBreakDown(F.struct('start_year','start_month','end_year','end_month','curr_year','curr_month')))
PythonException: An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last):
File "E:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 585, in main File "E:\spark\spark-3.0.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 593, in read_int length = stream.read(4) File "C:\ProgramData\Anaconda3\lib\socket.py", line 669, in readinto return self._sock.recv_into(b) socket.timeout: timed out
当您可以使用 Spark 内置函数时,请始终避免使用 UDF。您可以使用 when
函数重写您的逻辑,如下所示:
from pyspark.sql import functions as F
def get_include_col():
c = F.when((F.col("curr_year") == F.col("start_year")) & (F.col("curr_month") >= F.col("start_month")), F.lit(1)) \
.when((F.col("curr_year") == F.col("end_year")) & (F.col("curr_month") <= F.col("end_month")), F.lit(1)) \
.when((F.col("curr_year") > F.col("start_year")) & (F.col("curr_year") < F.col("end_year")), F.lit(1)) \
.otherwise(F.lit(0))
return c
temp = temp.withColumn('include', get_include_col())
您还可以使用 functools.reduce
动态生成 when 表达式,而无需将它们全部粘贴。例如:
import functools
from pyspark.sql import functions as F
cases = [
("curr_year = start_year and curr_month >= start_month", 1),
("curr_year = end_year and curr_month <= end_month", 1),
("curr_year > start_year and curr_year < end_year", 1)
]
include_col = functools.reduce(
lambda acc, x: acc.when(F.expr(x[0]), F.lit(x[1])),
cases,
F
).otherwise(F.lit(0))
temp = temp.withColumn('include', include_col)