Mongo Spark Java 连接器分组方式
Mongo Spark Java Connector Group by
我正在将来自客户端移动应用程序的事件存储在我的服务器上,事件存储是 mongodb
。
我有 mongo-spark 连接器,它获取这些事件的列表并应使用 rest api 显示它们。它应该在以后流式传输,但现在我正在尝试将它显示为单个调用。
到目前为止,我已经编写了如下所示的控制器:
@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
val javaSparkContext: JavaSparkContext) {
@GetMapping("/event")
fun getEvent(): ResponseEntity<EventResponse> {
val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
val toDF = customRdd.toDF()
}
}
请帮我过滤下面给出的这些结果以供休息api:
[
{
"key": "Event A",
"description": "Event A Description",
"count": 3
},
{
"key": "Event B",
"description": "Event B Description",
"count": 0
}
]
我有一个数据集如下:
/* 1 */
{
"_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 2 */
{
"_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 3 */
{
"_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
你应该能够在数据框上做这样的事情
val aggDf = toDf
.groupBy("name")
.agg(count("name"), max("description"))
现在在新数据框 aggDf
上,您可以 aggDf.toJson
并获得结果。如果列与输出不匹配,您可以使用 withColumnRenamed
调整它们
我正在将来自客户端移动应用程序的事件存储在我的服务器上,事件存储是 mongodb
。
我有 mongo-spark 连接器,它获取这些事件的列表并应使用 rest api 显示它们。它应该在以后流式传输,但现在我正在尝试将它显示为单个调用。
到目前为止,我已经编写了如下所示的控制器:
@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
val javaSparkContext: JavaSparkContext) {
@GetMapping("/event")
fun getEvent(): ResponseEntity<EventResponse> {
val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
val toDF = customRdd.toDF()
}
}
请帮我过滤下面给出的这些结果以供休息api:
[
{
"key": "Event A",
"description": "Event A Description",
"count": 3
},
{
"key": "Event B",
"description": "Event B Description",
"count": 0
}
]
我有一个数据集如下:
/* 1 */
{
"_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 2 */
{
"_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 3 */
{
"_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
你应该能够在数据框上做这样的事情
val aggDf = toDf
.groupBy("name")
.agg(count("name"), max("description"))
现在在新数据框 aggDf
上,您可以 aggDf.toJson
并获得结果。如果列与输出不匹配,您可以使用 withColumnRenamed