Spark Streaming - updateStateByKey 和缓存数据
Spark Streaming - updateStateByKey and caching data
我在使用 updateStateByKey 函数的同时缓存一些大数据时遇到问题。这是一个例子。
假设我从 kafka 获取数据(姓氏、年龄)。我想保留每个人的实际年龄,所以我使用 updateStateByKey。我也想知道每个人的名字,所以我用外部 table (lastname,name) 加入输出,例如来自蜂巢。让我们假设它真的很大 table,所以我不想在每批中加载它。还有一个问题。
一切正常,当我在每个批次中加载 table 时,但当我尝试缓存 table 时,StreamingContext 不会启动。我还尝试使用 registerTempTable,然后使用 sql 加入数据,但我遇到了同样的错误。
似乎问题出在 updateStateByKey 所需的检查点。当我删除 updateStateByKey 并离开检查点时出现错误,但是当我删除两者时它有效。
我得到的错误:pastebin
代码如下:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# function to keep actual state
def updateFunc(channel, actualChannel):
if (actualChannel is None or not channel is None):
try:
actualChannel = channel[-1]
except Exception:
pass
if channel is None:
channel = actualChannel
return actualChannel
def splitFunc(row):
row = row.strip()
lname,age = row.split()
return (lname,age)
def createContext(brokers,topics):
# some conf
conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\
set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100')
# create SparkContext
sc = SparkContext(conf=conf)
# create HiveContext
sqlContext = HiveContext(sc)
# create Streaming Context
ssc = StreamingContext(sc, 5)
# read big_df and cache (not work, Streaming Context not start)
big_df = sqlContext.sql('select lastname,name from `default`.`names`')
big_df.cache().show(10)
# join table
def joinTable(time,rdd):
if rdd.isEmpty()==False:
df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age'])
# read big_df (work)
#big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
# join DMS
df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")
return df2.map(lambda row:row)
# streaming
kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})
kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint()
return ssc
if __name__ == "__main__":
appName="SparkCheckpointUpdateSate"
if len(sys.argv) != 3:
print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>")
exit(-1)
brokers, topics = sys.argv[1:]
# getOrCreate Context
checkpoint = 'SparkCheckpoint/checkpoint'
ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics))
# start streaming
ssc.start()
ssc.awaitTermination()
你能告诉我如何在启用检查点时正确缓存数据吗?也许有一些我不知道的解决方法。
Spark 版本。 1.6
我使用 big_df 的惰性实例化全局实例来实现此功能。在 recoverable_network_wordcount.py 中完成了类似的事情
.
def getBigDf():
if ('bigdf' not in globals()):
globals()['bigdf'] = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
return globals()['bigdf']
def createContext(brokers,topics):
...
def joinTable(time,rdd):
...
# read big_df (work)
big_df = getBigDF()
# join DMS
df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")
return df2.map(lambda row:row)
...
似乎在流式处理中所有数据都必须缓存在流式处理中,而不是之前。
我在使用 updateStateByKey 函数的同时缓存一些大数据时遇到问题。这是一个例子。
假设我从 kafka 获取数据(姓氏、年龄)。我想保留每个人的实际年龄,所以我使用 updateStateByKey。我也想知道每个人的名字,所以我用外部 table (lastname,name) 加入输出,例如来自蜂巢。让我们假设它真的很大 table,所以我不想在每批中加载它。还有一个问题。
一切正常,当我在每个批次中加载 table 时,但当我尝试缓存 table 时,StreamingContext 不会启动。我还尝试使用 registerTempTable,然后使用 sql 加入数据,但我遇到了同样的错误。
似乎问题出在 updateStateByKey 所需的检查点。当我删除 updateStateByKey 并离开检查点时出现错误,但是当我删除两者时它有效。
我得到的错误:pastebin
代码如下:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# function to keep actual state
def updateFunc(channel, actualChannel):
if (actualChannel is None or not channel is None):
try:
actualChannel = channel[-1]
except Exception:
pass
if channel is None:
channel = actualChannel
return actualChannel
def splitFunc(row):
row = row.strip()
lname,age = row.split()
return (lname,age)
def createContext(brokers,topics):
# some conf
conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\
set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100')
# create SparkContext
sc = SparkContext(conf=conf)
# create HiveContext
sqlContext = HiveContext(sc)
# create Streaming Context
ssc = StreamingContext(sc, 5)
# read big_df and cache (not work, Streaming Context not start)
big_df = sqlContext.sql('select lastname,name from `default`.`names`')
big_df.cache().show(10)
# join table
def joinTable(time,rdd):
if rdd.isEmpty()==False:
df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age'])
# read big_df (work)
#big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
# join DMS
df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")
return df2.map(lambda row:row)
# streaming
kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})
kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint()
return ssc
if __name__ == "__main__":
appName="SparkCheckpointUpdateSate"
if len(sys.argv) != 3:
print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>")
exit(-1)
brokers, topics = sys.argv[1:]
# getOrCreate Context
checkpoint = 'SparkCheckpoint/checkpoint'
ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics))
# start streaming
ssc.start()
ssc.awaitTermination()
你能告诉我如何在启用检查点时正确缓存数据吗?也许有一些我不知道的解决方法。
Spark 版本。 1.6
我使用 big_df 的惰性实例化全局实例来实现此功能。在 recoverable_network_wordcount.py 中完成了类似的事情 .
def getBigDf():
if ('bigdf' not in globals()):
globals()['bigdf'] = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
return globals()['bigdf']
def createContext(brokers,topics):
...
def joinTable(time,rdd):
...
# read big_df (work)
big_df = getBigDF()
# join DMS
df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")
return df2.map(lambda row:row)
...
似乎在流式处理中所有数据都必须缓存在流式处理中,而不是之前。