Spark Streaming - updateStateByKey 和缓存数据

Spark Streaming - updateStateByKey and caching data

我在使用 updateStateByKey 函数的同时缓存一些大数据时遇到问题。这是一个例子。

假设我从 kafka 获取数据(姓氏、年龄)。我想保留每个人的实际年龄,所以我使用 updateStateByKey。我也想知道每个人的名字,所以我用外部 table (lastname,name) 加入输出,例如来自蜂巢。让我们假设它真的很大 table,所以我不想在每批中加载它。还有一个问题。

一切正常,当我在每个批次中加载 table 时,但当我尝试缓存 table 时,StreamingContext 不会启动。我还尝试使用 registerTempTable,然后使用 sql 加入数据,但我遇到了同样的错误。

似乎问题出在 updateStateByKey 所需的检查点。当我删除 updateStateByKey 并离开检查点时出现错误,但是当我删除两者时它有效。

我得到的错误:pastebin

代码如下:

import sys

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# function to keep actual state     
def updateFunc(channel, actualChannel):
    if (actualChannel is None or not channel is None):
        try:
            actualChannel = channel[-1]
        except Exception:
            pass
    if channel is None:
        channel = actualChannel
    return actualChannel  

def splitFunc(row):
    row = row.strip()
    lname,age = row.split()
    return (lname,age)    


def createContext(brokers,topics):
    # some conf
    conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\
    set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100')
    # create SparkContext
    sc = SparkContext(conf=conf)

    # create HiveContext
    sqlContext = HiveContext(sc)

    # create Streaming Context
    ssc = StreamingContext(sc, 5)

    # read big_df and cache (not work, Streaming Context not start)  
    big_df = sqlContext.sql('select lastname,name from `default`.`names`')
    big_df.cache().show(10)

    # join table
    def joinTable(time,rdd):
        if rdd.isEmpty()==False:
            df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age'])

            # read big_df (work)
            #big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')

            # join DMS
            df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")

            return df2.map(lambda row:row)

    # streaming
    kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})        
    kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint()

    return ssc

if __name__ == "__main__":
    appName="SparkCheckpointUpdateSate"
    if len(sys.argv) != 3:
        print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>")
        exit(-1)

    brokers, topics = sys.argv[1:]

    # getOrCreate Context
    checkpoint = 'SparkCheckpoint/checkpoint'
    ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics))

    # start streaming
    ssc.start()
    ssc.awaitTermination()

你能告诉我如何在启用检查点时正确缓存数据吗?也许有一些我不知道的解决方法。

Spark 版本。 1.6

我使用 big_df 的惰性实例化全局实例来实现此功能。在 recoverable_network_wordcount.py 中完成了类似的事情 .

def getBigDf():
    if ('bigdf' not in globals()):
        globals()['bigdf'] = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
    return globals()['bigdf']

def createContext(brokers,topics):
    ...
    def joinTable(time,rdd):
        ...
        # read big_df (work)
        big_df = getBigDF()

        # join DMS
        df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")

        return df2.map(lambda row:row)
    ...

似乎在流式处理中所有数据都必须缓存在流式处理中,而不是之前。