如何在pyspark中做基于价值的交叉表

How to do value based crosstab in pyspark

这是我的数据

+--------+-----+-----+
| subs_no|month|count|
+--------+-----+-----+
|101     |    9|  288|
|100     |   10|  234|
|101     |   10|   44|
|100     |    9|  324|
+--------+-----+-----+

这是我的预期输出

+--------+-------+--------+
| subs_no|count_9|count_10|
+--------+-------+--------+
|100     |    324|     234|
|101     |    288|      44|
+--------+-------+--------+

这是我所做的

airport_pivot = airport_pivot.withColumn('month', F.sum(airport_pivot.count) .over(Window.partitionBy("subs_no").orderBy().rowsBetween(-sys.maxsize, 0)))

我的错误信息

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/functions.py in _(col)
     42     def _(col):
     43         sc = SparkContext._active_spark_context
---> 44         jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
     45         return Column(jc)
     46     _.__name__ = name

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1246 
   1247     def __call__(self, *args):
-> 1248         args_command, temp_args = self._build_args(*args)
   1249 
   1250         command = proto.CALL_COMMAND_NAME +\

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args)
   1216 
   1217         args_command = "".join(
-> 1218             [get_command_part(arg, self.pool) for arg in new_args])
   1219 
   1220         return args_command, temp_args

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in <listcomp>(.0)
   1216 
   1217         args_command = "".join(
-> 1218             [get_command_part(arg, self.pool) for arg in new_args])
   1219 
   1220         return args_command, temp_args

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
    296             command_part += ";" + interface
    297     else:
--> 298         command_part = REFERENCE_TYPE + parameter._get_object_id()
    299 
    300     command_part += "\n"

AttributeError: 'function' object has no attribute '_get_object_id'

我认为你应该旋转并重命名列

    df.groupBy("subs_no").pivot("month").sum("count").withColumnRenamed(
  "9", "count_9").withColumnRenamed("10", "count_10").show()


+-------+-------+--------+
|subs_no|count_9|count_10|
+-------+-------+--------+
|    100|    324|     234|
|    101|    288|      44|
+-------+-------+--------+