无法将日志功能应用于 pyspark 数据框
Unable to apply log function to a pyspark dataframe
所以我有一个大数据集(大约 1 TB+),我必须在其中执行许多操作,我曾考虑使用 pyspark 来加快处理速度。这是我的进口商品:
import numpy as np
import pandas as pd
try:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
except ImportError as e:
raise ImportError('PySpark is not Configured')
print(f"PySpark Version : {pyspark.__version__}")
# Creating a Spark-Context
sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))
# Spark Builder
spark = SparkSession.builder \
.appName('MBLSRProcessor') \
.config('spark.executor.memory', '10GB') \
.getOrCreate()
# SQL Context - for SQL Query Executions
sqlContext = SQLContext(sc)
>> PySpark Version : 2.4.7
现在,我想在两列上应用 log10
函数 - 为了演示,请考虑以下数据:
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
data.head(5)
>> [Row(A=1, B=4), Row(A=2, B=3), Row(A=3, B=6), Row(A=4, B=1), Row(A=5, B=8)]
这就是我所需要的:log10(A + B)
即 log10(6 + 4) = 1
我为此创建了一个这样的函数:
def add(a, b):
# this is for demonstration
return np.sum([a, b])
data = data.withColumn("ADD", add(data.A, data.B))
data.head(5)
>> [Row(A=1, B=4, ADD=5), Row(A=2, B=3, ADD=5), Row(A=3, B=6, ADD=9), Row(A=4, B=1, ADD=5), Row(A=5, B=8, ADD=13)]
但是,我不能为 np.log10
做同样的事情:
def np_log(a, b):
# actual function
return np.log10(np.sum([a, b]))
data = data.withColumn("LOG", np_log(data.A, data.B))
data.head(5)
TypeError Traceback (most recent call last)
<ipython-input-13-a5726b6c7dc2> in <module>
----> 1 data = data.withColumn("LOG", np_log(data.A, data.B))
2 data.head(5)
<ipython-input-12-0e020707faae> in np_log(a, b)
1 def np_log(a, b):
----> 2 return np.log10(np.sum([a, b]))
TypeError: loop of ufunc does not support argument 0 of type Column which has no callable log10 method
最好的方法是使用本机 Spark 函数:
import pyspark.sql.functions as F
import pandas as pd
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
data = data.withColumn("LOG", F.log10(F.col('A') + F.col('B')))
但如果你愿意,你也可以使用 UDF:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
import numpy as np
import pandas as pd
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
def udf_np_log(a, b):
# actual function
return float(np.log10(np.sum([a, b])))
np_log = F.udf(udf_np_log, FloatType())
data = data.withColumn("LOG", np_log(data.A, data.B))
+---+---+---------+
| A| B| LOG|
+---+---+---------+
| 1| 4| 0.69897|
| 2| 3| 0.69897|
| 3| 6|0.9542425|
| 4| 1| 0.69897|
| 5| 8|1.1139433|
+---+---+---------+
有趣的是,它适用于没有 UDF 的 np.sum
,因为我猜 np.sum
只是调用 +
运算符,这对 spark 数据帧列有效。
所以我有一个大数据集(大约 1 TB+),我必须在其中执行许多操作,我曾考虑使用 pyspark 来加快处理速度。这是我的进口商品:
import numpy as np
import pandas as pd
try:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
except ImportError as e:
raise ImportError('PySpark is not Configured')
print(f"PySpark Version : {pyspark.__version__}")
# Creating a Spark-Context
sc = SparkContext.getOrCreate(SparkConf().setMaster('local[*]'))
# Spark Builder
spark = SparkSession.builder \
.appName('MBLSRProcessor') \
.config('spark.executor.memory', '10GB') \
.getOrCreate()
# SQL Context - for SQL Query Executions
sqlContext = SQLContext(sc)
>> PySpark Version : 2.4.7
现在,我想在两列上应用 log10
函数 - 为了演示,请考虑以下数据:
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
data.head(5)
>> [Row(A=1, B=4), Row(A=2, B=3), Row(A=3, B=6), Row(A=4, B=1), Row(A=5, B=8)]
这就是我所需要的:log10(A + B)
即 log10(6 + 4) = 1
我为此创建了一个这样的函数:
def add(a, b):
# this is for demonstration
return np.sum([a, b])
data = data.withColumn("ADD", add(data.A, data.B))
data.head(5)
>> [Row(A=1, B=4, ADD=5), Row(A=2, B=3, ADD=5), Row(A=3, B=6, ADD=9), Row(A=4, B=1, ADD=5), Row(A=5, B=8, ADD=13)]
但是,我不能为 np.log10
做同样的事情:
def np_log(a, b):
# actual function
return np.log10(np.sum([a, b]))
data = data.withColumn("LOG", np_log(data.A, data.B))
data.head(5)
TypeError Traceback (most recent call last)
<ipython-input-13-a5726b6c7dc2> in <module>
----> 1 data = data.withColumn("LOG", np_log(data.A, data.B))
2 data.head(5)
<ipython-input-12-0e020707faae> in np_log(a, b)
1 def np_log(a, b):
----> 2 return np.log10(np.sum([a, b]))
TypeError: loop of ufunc does not support argument 0 of type Column which has no callable log10 method
最好的方法是使用本机 Spark 函数:
import pyspark.sql.functions as F
import pandas as pd
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
data = data.withColumn("LOG", F.log10(F.col('A') + F.col('B')))
但如果你愿意,你也可以使用 UDF:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
import numpy as np
import pandas as pd
data = spark.createDataFrame(pd.DataFrame({
"A" : [1, 2, 3, 4, 5],
"B" : [4, 3, 6, 1, 8]
}))
def udf_np_log(a, b):
# actual function
return float(np.log10(np.sum([a, b])))
np_log = F.udf(udf_np_log, FloatType())
data = data.withColumn("LOG", np_log(data.A, data.B))
+---+---+---------+
| A| B| LOG|
+---+---+---------+
| 1| 4| 0.69897|
| 2| 3| 0.69897|
| 3| 6|0.9542425|
| 4| 1| 0.69897|
| 5| 8|1.1139433|
+---+---+---------+
有趣的是,它适用于没有 UDF 的 np.sum
,因为我猜 np.sum
只是调用 +
运算符,这对 spark 数据帧列有效。