Spark:Task 上的 Redis 不可序列化
Redis on Spark:Task not serializable
我们使用Redis on Spark来缓存我们的键值pairs.This是代码:
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
val arr = x.split(" ")
val readId = arr(0).toInt
val refId = arr(1).toInt
val start = arr(2).toInt
val end = arr(3).toInt
val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
val readStr = r.hmget("readStr", readId).get(readId)
val realend = if(end > refStr.length - 1) refStr.length - 1 else end
val refOneStr = refStr.substring(start, realend)
(readStr, refOneStr, refId, start, realend, readId)
})
但是编译器给我这样的反馈:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.ynu.App$.main(App.scala:511)
at com.ynu.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
有人能告诉我如何序列化从 Redis.Thanks 中获取的数据吗?
您正在尝试序列化 客户端。您有一个 RedisClient
、r
,您正尝试在 map
内部使用,它将 运行 跨不同的集群节点。在执行集群任务之前分别从 redis 中获取您想要的数据,或者为每个集群任务单独创建客户端 inside 您的 map
块(可能通过使用 mapPartitions
而不是 map
,因为为每一行创建一个新的 redis 客户端可能不是一个好主意)。
在 Spark 中,RDD
上的函数(如此处的 map
)被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该是可序列化的。
此处的 Redis 连接不可序列化,因为它打开到绑定到创建它的机器的目标数据库的 TCP 连接。
解决方案是在本地执行上下文中的执行程序上创建这些连接。有几种方法可以做到这一点。想到的两个是:
rdd.mapPartitions
:让您一次处理整个分区,从而分摊创建连接的成本)
- 单例连接管理器:每个执行程序创建一次连接
mapPartitions
更简单,因为它只需要对程序结构做一个小改动:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
单例连接管理器可以用一个对象建模,该对象持有对连接的惰性引用(注意:可变引用也可以)。
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
此对象随后可用于为每个工作 JVM 实例化 1 个连接,并在操作闭包中用作 Serializable
对象。
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
使用单例对象的优点是开销较少,因为 JVM 仅创建一次连接(而不是每个 RDD 分区 1 个)
也有一些缺点:
- 清理连接很棘手(关闭 hook/timers)
- 必须确保共享资源的线程安全
(*) 代码用于说明目的。未编译或测试。
我们使用Redis on Spark来缓存我们的键值pairs.This是代码:
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
val arr = x.split(" ")
val readId = arr(0).toInt
val refId = arr(1).toInt
val start = arr(2).toInt
val end = arr(3).toInt
val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
val readStr = r.hmget("readStr", readId).get(readId)
val realend = if(end > refStr.length - 1) refStr.length - 1 else end
val refOneStr = refStr.substring(start, realend)
(readStr, refOneStr, refId, start, realend, readId)
})
但是编译器给我这样的反馈:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.ynu.App$.main(App.scala:511)
at com.ynu.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
有人能告诉我如何序列化从 Redis.Thanks 中获取的数据吗?
您正在尝试序列化 客户端。您有一个 RedisClient
、r
,您正尝试在 map
内部使用,它将 运行 跨不同的集群节点。在执行集群任务之前分别从 redis 中获取您想要的数据,或者为每个集群任务单独创建客户端 inside 您的 map
块(可能通过使用 mapPartitions
而不是 map
,因为为每一行创建一个新的 redis 客户端可能不是一个好主意)。
在 Spark 中,RDD
上的函数(如此处的 map
)被序列化并发送给执行程序进行处理。这意味着这些操作中包含的所有元素都应该是可序列化的。
此处的 Redis 连接不可序列化,因为它打开到绑定到创建它的机器的目标数据库的 TCP 连接。
解决方案是在本地执行上下文中的执行程序上创建这些连接。有几种方法可以做到这一点。想到的两个是:
rdd.mapPartitions
:让您一次处理整个分区,从而分摊创建连接的成本)- 单例连接管理器:每个执行程序创建一次连接
mapPartitions
更简单,因为它只需要对程序结构做一个小改动:
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
单例连接管理器可以用一个对象建模,该对象持有对连接的惰性引用(注意:可变引用也可以)。
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
此对象随后可用于为每个工作 JVM 实例化 1 个连接,并在操作闭包中用作 Serializable
对象。
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
使用单例对象的优点是开销较少,因为 JVM 仅创建一次连接(而不是每个 RDD 分区 1 个)
也有一些缺点:
- 清理连接很棘手(关闭 hook/timers)
- 必须确保共享资源的线程安全
(*) 代码用于说明目的。未编译或测试。