在 spark 序列化中使用 maxmind geoip
using maxmind geoip in spark serialized
我正在尝试将 MaxMind GeoIP API 用于已找到 https://github.com/snowplow/scala-maxmind-iplookups 的 scala-spark。我使用标准加载文件:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
我加载了一个包含时间和 IP 地址的基本 csv 文件:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
函数 ipdetect 的基本定义如下:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
当我运行这个程序时,提示"Task not serializable"。所以我看了一些帖子,似乎有一些解决方法。
1,a wrapper
2、使用SparkContext.addFile(跨集群分发文件)
但我无法弄清楚其中任何一个是如何工作的,我尝试了包装器,但我不知道如何以及在何处调用它。
我尝试了 addFile,但它 returns 是一个单元而不是字符串,我假设您需要以某种方式通过管道传输二进制文件。所以我不确定现在该做什么。非常感谢任何帮助
所以我已经能够通过使用 mapPartitions 对其进行某种程度的序列化并遍历每个本地分区,但我想知道是否有更有效的方法来执行此操作,因为我的数据集在数百万范围内
假设您的 csv 文件每行包含一个 IP 地址,例如,您想要将每个 IP 地址映射到一个城市。
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
其他ip转换请参考Scala MaxMind IP Lookups。
此外, mapWith
似乎已被弃用。请改用 mapPartitionsWithIndex
。
我正在尝试将 MaxMind GeoIP API 用于已找到 https://github.com/snowplow/scala-maxmind-iplookups 的 scala-spark。我使用标准加载文件:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
我加载了一个包含时间和 IP 地址的基本 csv 文件:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
函数 ipdetect 的基本定义如下:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
当我运行这个程序时,提示"Task not serializable"。所以我看了一些帖子,似乎有一些解决方法。
1,a wrapper 2、使用SparkContext.addFile(跨集群分发文件)
但我无法弄清楚其中任何一个是如何工作的,我尝试了包装器,但我不知道如何以及在何处调用它。 我尝试了 addFile,但它 returns 是一个单元而不是字符串,我假设您需要以某种方式通过管道传输二进制文件。所以我不确定现在该做什么。非常感谢任何帮助
所以我已经能够通过使用 mapPartitions 对其进行某种程度的序列化并遍历每个本地分区,但我想知道是否有更有效的方法来执行此操作,因为我的数据集在数百万范围内
假设您的 csv 文件每行包含一个 IP 地址,例如,您想要将每个 IP 地址映射到一个城市。
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
其他ip转换请参考Scala MaxMind IP Lookups。
此外, mapWith
似乎已被弃用。请改用 mapPartitionsWithIndex
。