从 PySpark 向 Redis 写入数据
Write data to Redis from PySpark
在 Scala 中,我们会像这样将 RDD 写入 Redis:
datardd.foreachPartition(iter => {
val r = new RedisClient("hosturl", 6379)
iter.foreach(i => {
val (str, it) = i
val map = it.toMap
r.hmset(str, map)
})
})
我尝试在 PySpark 中这样做:datardd.foreachPartition(storeToRedis)
,其中函数 storeToRedis
定义为:
def storeToRedis(x):
r = redis.StrictRedis(host = 'hosturl', port = 6379)
for i in x:
r.set(i[0], dict(i[1]))
它给了我这个:
ImportError: ('No module named redis', function subimport at
0x47879b0, ('redis',))
当然是我导入了redis
PySpark 的 SparkContext 有一个 addPyFile
方法专门用于这个东西。
将 redis 模块制作成 zip 文件 (like this) 并调用此方法即可:
sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")
在 Scala 中,我们会像这样将 RDD 写入 Redis:
datardd.foreachPartition(iter => {
val r = new RedisClient("hosturl", 6379)
iter.foreach(i => {
val (str, it) = i
val map = it.toMap
r.hmset(str, map)
})
})
我尝试在 PySpark 中这样做:datardd.foreachPartition(storeToRedis)
,其中函数 storeToRedis
定义为:
def storeToRedis(x):
r = redis.StrictRedis(host = 'hosturl', port = 6379)
for i in x:
r.set(i[0], dict(i[1]))
它给了我这个:
ImportError: ('No module named redis', function subimport at 0x47879b0, ('redis',))
当然是我导入了redis
PySpark 的 SparkContext 有一个 addPyFile
方法专门用于这个东西。
将 redis 模块制作成 zip 文件 (like this) 并调用此方法即可:
sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")