将 Spark-Streaming 输出的字段名称映射到 Elastic Search
Mapping field names of the output from Spark-Streaming to Elastic Search
我正在使用以下代码将 Spark-Streaming
的输出存储到 ElasticSearch
。我想将 spark-streaming 的输出映射到正确的名称 i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
。但正如您目前看到的那样,它在 ES 中被映射为 _1 或 _2 等。
此外,我想在索引 ES 中的数据之前放置一些过滤器,即 (if PlatFormName = "ubuntu" then index the data)
。那么,我该怎么做呢?
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
ElasticSearch 中的输出:
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
elasticsearch文档的key是_1、_2等,因为你存储的是(Tuple6, Long)数据类型的PairRDD。
要保留密钥,您应该使用大小写 class 作为密钥。
val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)
我假设对象 x 的 class 是一个案例 class 并且您想使用该 class 的所有字段来进行缩减(即检查相等性2 个案例 class 个实例)。如果 class 的所有字段都不是要用于相等的 class 的自然键,那么您有两个选择 -
- 根据您的情况覆盖 equals 和 hashCode class
- 创建另一个案例 class 只有关键字段(您在元组中使用的字段 - (x.key, x.os, x.platform, x.mobile, x.browser)) 并映射到该案例 class 而不是第一行中的元组 lines.map { x => ...}.
您可以在写入 ElasticSearch 之前添加您想要的过滤器。
pageCounts.foreachRDD { x =>
if (x.toLocalIterator.nonEmpty) {
val y = x.filter(z => z._1.platform == "ubuntu")
EsSpark.saveToEs(y, "spark/ElasticSearch")
}
}
PS:如果您正在测试 RDD 对,将 (case class, Long) case class 作为键,就像我建议的那样 lines.map(x = > (x, 1)).reduceByKey(_ + _).有一个与 Spark Shell 特别相关的错误,即 case classes 不能作为键 classes 正确地用于 reduce 操作 - jira issue
我正在使用以下代码将 Spark-Streaming
的输出存储到 ElasticSearch
。我想将 spark-streaming 的输出映射到正确的名称 i.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
。但正如您目前看到的那样,它在 ES 中被映射为 _1 或 _2 等。
此外,我想在索引 ES 中的数据之前放置一些过滤器,即 (if PlatFormName = "ubuntu" then index the data)
。那么,我该怎么做呢?
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
ElasticSearch 中的输出:
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
elasticsearch文档的key是_1、_2等,因为你存储的是(Tuple6, Long)数据类型的PairRDD。
要保留密钥,您应该使用大小写 class 作为密钥。
val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_)
我假设对象 x 的 class 是一个案例 class 并且您想使用该 class 的所有字段来进行缩减(即检查相等性2 个案例 class 个实例)。如果 class 的所有字段都不是要用于相等的 class 的自然键,那么您有两个选择 -
- 根据您的情况覆盖 equals 和 hashCode class
- 创建另一个案例 class 只有关键字段(您在元组中使用的字段 - (x.key, x.os, x.platform, x.mobile, x.browser)) 并映射到该案例 class 而不是第一行中的元组 lines.map { x => ...}.
您可以在写入 ElasticSearch 之前添加您想要的过滤器。
pageCounts.foreachRDD { x =>
if (x.toLocalIterator.nonEmpty) {
val y = x.filter(z => z._1.platform == "ubuntu")
EsSpark.saveToEs(y, "spark/ElasticSearch")
}
}
PS:如果您正在测试 RDD 对,将 (case class, Long) case class 作为键,就像我建议的那样 lines.map(x = > (x, 1)).reduceByKey(_ + _).有一个与 Spark Shell 特别相关的错误,即 case classes 不能作为键 classes 正确地用于 reduce 操作 - jira issue