Mongo Spark Java 连接器分组方式

Mongo Spark Java Connector Group by

我正在将来自客户端移动应用程序的事件存储在我的服务器上,事件存储是 mongodb。 我有 mongo-spark 连接器,它获取这些事件的列表并应使用 rest api 显示它们。它应该在以后流式传输,但现在我正在尝试将它显示为单个调用。

到目前为止,我已经编写了如下所示的控制器:

@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
                                             val javaSparkContext: JavaSparkContext) {

    @GetMapping("/event")
    fun getEvent(): ResponseEntity<EventResponse> {
        val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
        val toDF = customRdd.toDF()
    }
}

请帮我过滤下面给出的这些结果以供休息api:

 [
      {
        "key": "Event A",
        "description": "Event A Description",
        "count": 3
      },
      {
        "key": "Event B",
        "description": "Event B Description",
        "count": 0
      }
    ]

我有一个数据集如下:

/* 1 */
{
    "_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 2 */
{
    "_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 3 */
{
    "_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

你应该能够在数据框上做这样的事情

val aggDf = toDf
 .groupBy("name")
 .agg(count("name"), max("description"))

现在在新数据框 aggDf 上,您可以 aggDf.toJson 并获得结果。如果列与输出不匹配,您可以使用 withColumnRenamed

调整它们