基于广播变量的pyspark过滤器数据帧
pyspark filter dataframe based on broadcast variable
我有一个 pyspark 2.0 数据框,我正在尝试根据(相对)短列表进行过滤 - 可能长度为 50-100。
filterList = ['A','B','C']
我想将该列表广播到我的每个节点,并使用它来删除其中两列之一不在我的列表中的记录。
此操作有效:
filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))
但是当我广播列表时,我得到一个错误:
filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
284 if len(cols) == 1 and isinstance(cols[0], (list, set)):
285 cols = cols[0]
--> 286 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
287 sc = SparkContext._active_spark_context
288 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
33 def _create_column_from_literal(literal):
34 sc = SparkContext._active_spark_context
---> 35 return sc._jvm.functions.lit(literal)
36
37
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1122
1123 def __call__(self, *args):
-> 1124 args_command, temp_args = self._build_args(*args)
1125
1126 command = proto.CALL_COMMAND_NAME +\
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in _build_args(self, *args)
1092
1093 args_command = "".join(
-> 1094 [get_command_part(arg, self.pool) for arg in new_args])
1095
1096 return args_command, temp_args
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
287 command_part += ";" + interface
288 else:
--> 289 command_part = REFERENCE_TYPE + parameter._get_object_id()
290
291 command_part += "\n"
AttributeError: 'Broadcast' object has no attribute '_get_object_id'
关于我应该如何根据广播列表过滤 pyspark 2.0 数据帧有什么想法吗?
您不能在 DataFrame 函数中直接访问 Broadcast 变量,而是使用 'value' 访问 Broadcast 变量的值。
所以,修改你的代码如下:
filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB.value)) | (df['Bar'].isin(filterListB.value)))
参考:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html
我有一个 pyspark 2.0 数据框,我正在尝试根据(相对)短列表进行过滤 - 可能长度为 50-100。
filterList = ['A','B','C']
我想将该列表广播到我的每个节点,并使用它来删除其中两列之一不在我的列表中的记录。
此操作有效:
filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))
但是当我广播列表时,我得到一个错误:
filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
284 if len(cols) == 1 and isinstance(cols[0], (list, set)):
285 cols = cols[0]
--> 286 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
287 sc = SparkContext._active_spark_context
288 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
33 def _create_column_from_literal(literal):
34 sc = SparkContext._active_spark_context
---> 35 return sc._jvm.functions.lit(literal)
36
37
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in __call__(self, *args)
1122
1123 def __call__(self, *args):
-> 1124 args_command, temp_args = self._build_args(*args)
1125
1126 command = proto.CALL_COMMAND_NAME +\
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in _build_args(self, *args)
1092
1093 args_command = "".join(
-> 1094 [get_command_part(arg, self.pool) for arg in new_args])
1095
1096 return args_command, temp_args
/usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
287 command_part += ";" + interface
288 else:
--> 289 command_part = REFERENCE_TYPE + parameter._get_object_id()
290
291 command_part += "\n"
AttributeError: 'Broadcast' object has no attribute '_get_object_id'
关于我应该如何根据广播列表过滤 pyspark 2.0 数据帧有什么想法吗?
您不能在 DataFrame 函数中直接访问 Broadcast 变量,而是使用 'value' 访问 Broadcast 变量的值。
所以,修改你的代码如下:
filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB.value)) | (df['Bar'].isin(filterListB.value)))
参考:https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html