从 PySpark 向 Redis 写入数据

Write data to Redis from PySpark

在 Scala 中,我们会像这样将 RDD 写入 Redis:

datardd.foreachPartition(iter => {
      val r = new RedisClient("hosturl", 6379)
      iter.foreach(i => {
        val (str, it) = i
        val map = it.toMap
        r.hmset(str, map)
      })
    })

我尝试在 PySpark 中这样做:datardd.foreachPartition(storeToRedis),其中函数 storeToRedis 定义为:

def storeToRedis(x):
    r = redis.StrictRedis(host = 'hosturl', port = 6379)
    for i in x:
        r.set(i[0], dict(i[1]))

它给了我这个:

ImportError: ('No module named redis', function subimport at 0x47879b0, ('redis',))

当然是我导入了redis

PySpark 的 SparkContext 有一个 addPyFile 方法专门用于这个东西。 将 redis 模块制作成 zip 文件 (like this) 并调用此方法即可:

sc = SparkContext(appName = "analyze")
sc.addPyFile("/path/to/redis.zip")