如何在pyspark中做基于价值的交叉表
How to do value based crosstab in pyspark
这是我的数据
+--------+-----+-----+
| subs_no|month|count|
+--------+-----+-----+
|101 | 9| 288|
|100 | 10| 234|
|101 | 10| 44|
|100 | 9| 324|
+--------+-----+-----+
这是我的预期输出
+--------+-------+--------+
| subs_no|count_9|count_10|
+--------+-------+--------+
|100 | 324| 234|
|101 | 288| 44|
+--------+-------+--------+
这是我所做的
airport_pivot = airport_pivot.withColumn('month', F.sum(airport_pivot.count) .over(Window.partitionBy("subs_no").orderBy().rowsBetween(-sys.maxsize, 0)))
我的错误信息
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/functions.py in _(col)
42 def _(col):
43 sc = SparkContext._active_spark_context
---> 44 jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
45 return Column(jc)
46 _.__name__ = name
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1246
1247 def __call__(self, *args):
-> 1248 args_command, temp_args = self._build_args(*args)
1249
1250 command = proto.CALL_COMMAND_NAME +\
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in <listcomp>(.0)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
296 command_part += ";" + interface
297 else:
--> 298 command_part = REFERENCE_TYPE + parameter._get_object_id()
299
300 command_part += "\n"
AttributeError: 'function' object has no attribute '_get_object_id'
我认为你应该旋转并重命名列
df.groupBy("subs_no").pivot("month").sum("count").withColumnRenamed(
"9", "count_9").withColumnRenamed("10", "count_10").show()
+-------+-------+--------+
|subs_no|count_9|count_10|
+-------+-------+--------+
| 100| 324| 234|
| 101| 288| 44|
+-------+-------+--------+
这是我的数据
+--------+-----+-----+
| subs_no|month|count|
+--------+-----+-----+
|101 | 9| 288|
|100 | 10| 234|
|101 | 10| 44|
|100 | 9| 324|
+--------+-----+-----+
这是我的预期输出
+--------+-------+--------+
| subs_no|count_9|count_10|
+--------+-------+--------+
|100 | 324| 234|
|101 | 288| 44|
+--------+-------+--------+
这是我所做的
airport_pivot = airport_pivot.withColumn('month', F.sum(airport_pivot.count) .over(Window.partitionBy("subs_no").orderBy().rowsBetween(-sys.maxsize, 0)))
我的错误信息
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/functions.py in _(col)
42 def _(col):
43 sc = SparkContext._active_spark_context
---> 44 jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
45 return Column(jc)
46 _.__name__ = name
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1246
1247 def __call__(self, *args):
-> 1248 args_command, temp_args = self._build_args(*args)
1249
1250 command = proto.CALL_COMMAND_NAME +\
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in <listcomp>(.0)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
296 command_part += ";" + interface
297 else:
--> 298 command_part = REFERENCE_TYPE + parameter._get_object_id()
299
300 command_part += "\n"
AttributeError: 'function' object has no attribute '_get_object_id'
我认为你应该旋转并重命名列
df.groupBy("subs_no").pivot("month").sum("count").withColumnRenamed(
"9", "count_9").withColumnRenamed("10", "count_10").show()
+-------+-------+--------+
|subs_no|count_9|count_10|
+-------+-------+--------+
| 100| 324| 234|
| 101| 288| 44|
+-------+-------+--------+