数据框中的pyspark进程表达式

pyspark process expression in dataframe

我有一个员工的薪水,想知道薪水下降了多少桶?

salary = 3000

规则table

rule_id,condtion,bucket
1,salary>1000,'A'
2,salary>2000,'B'
3,salary>3000,'C'
4,salary>4000,'D'
5,salary>5000,'E'

从以上两个 tables 我想产生以下结果

rule_id,condtion,bucket,result
1,salary>1000,'A',True
2,salary>2000,'B'True
3,salary>3000,'C',True
4,salary>4000,'D',False
5,salary>5000,'E',False

我尝试使用 python 和 spark


salary = 3000

rules_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('rule.csv')


validate = rules_df.withColumn('result',eval(rules_df.condtion))

对于上面的代码,我遇到了以下错误

Traceback (most recent call last):

  File "C:\Anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)

  File "<ipython-input-129-a813eeada424>", line 1, in <module>
    rules_df.withColumn('valid',eval(str(rules_df.condtion)))

  File "<string>", line 1
    Column<b'condtion'>
                      ^
SyntaxError: unexpected EOF while parsing

这里需要用udf来做eval: 这里'数据框:

+-------+-----------+------+
|rule_id|   condtion|bucket|
+-------+-----------+------+
|      1|salary>1000|   'A'|
|      2|salary>2000|   'B'|
|      3|salary>3000|   'C'|
|      4|salary>4000|   'D'|
|      5|salary>5000|   'E'|
+-------+-----------+------+

现在我们定义一个udf来显式解析列condtion:

salary = 3000
my_udf = F.udf(lambda c: eval(c, {'salary':salary}), BooleanType())
df = df.withColumn('result', my_udf('condtion'))
df.show()

+-------+-----------+------+------+
|rule_id|   condtion|bucket|result|
+-------+-----------+------+------+
|      1|salary>1000|   'A'|  true|
|      2|salary>2000|   'B'|  true|
|      3|salary>3000|   'C'| false|
|      4|salary>4000|   'D'| false|
|      5|salary>5000|   'E'| false|
+-------+-----------+------+------+

我不确定你的 rule table 有多大,但如果不是太大并且 如果您只是想将规则 table 应用于单个数据点, 这可能不是实现它的最有效方法。 使用常规 pythonpandas 函数会更容易。

在大多数情况下,更有可能将规则应用于很多很多数据。