当我从 DStream 加入 PipelinedRDD 和 RDD 时应用程序挂起
Application hangs when I do join for PipelinedRDD and RDD from DStream
我将 spark 1.6.0 与 Spark Streaming 一起使用,但在广泛操作方面遇到了一个问题。
代码示例:
有一个名为 "a" 的 RDD,其类型为:class 'pyspark.rdd.PipelinedRDD'.
"a" 被接收为:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filename)
parts = lines.map(lambda l: l.split(","))
clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(clients)
schemaPeople.registerTempTable("clients")
client_list = sqlContext.sql("SELECT * FROM clients")
及之后:
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))
第二部分 "b" 类型为 class 'pyspark.streaming.dstream.TransformedDStream'。
我从 Flume:
收到 "b"
DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))
和
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))
问题是: 当我尝试加入时:
mult = b.transform(lambda rdd: rdd.join(a))
我的应用程序在这个阶段挂起(现在我在 b.pprint() 之后和 .join() 阶段之前显示屏幕)
但是当我添加时:
声明RDD"test":
test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
并做:
mult0 = a.join(test)
mult = b.transform(lambda rdd: rdd.join(mult0))`
然后就可以了(!!):
我也能做到:
mult0 = b.transform(lambda rdd: rdd.join(test))
因此:
我有 RDDs "a" 和 "test"。 DStream "b"。
我可以乘:
- a * 测试 * b
- b * 测试
但是我做不到'b * a'。
感谢任何帮助!谢谢!
根据 user6910411 的建议,我将 "a" 缓存为
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache()
问题已解决。
我将 spark 1.6.0 与 Spark Streaming 一起使用,但在广泛操作方面遇到了一个问题。
代码示例: 有一个名为 "a" 的 RDD,其类型为:class 'pyspark.rdd.PipelinedRDD'.
"a" 被接收为:
# Load a text file and convert each line to a Row.
lines = sc.textFile(filename)
parts = lines.map(lambda l: l.split(","))
clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(clients)
schemaPeople.registerTempTable("clients")
client_list = sqlContext.sql("SELECT * FROM clients")
及之后:
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))
第二部分 "b" 类型为 class 'pyspark.streaming.dstream.TransformedDStream'。 我从 Flume:
收到 "b"DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))
和
b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))
问题是: 当我尝试加入时:
mult = b.transform(lambda rdd: rdd.join(a))
我的应用程序在这个阶段挂起(现在我在 b.pprint() 之后和 .join() 阶段之前显示屏幕)
但是当我添加时:
声明RDD"test":
test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
并做:
mult0 = a.join(test) mult = b.transform(lambda rdd: rdd.join(mult0))`
然后就可以了(!!):
我也能做到:
mult0 = b.transform(lambda rdd: rdd.join(test))
因此:
我有 RDDs "a" 和 "test"。 DStream "b"。 我可以乘:
- a * 测试 * b
- b * 测试
但是我做不到'b * a'。
感谢任何帮助!谢谢!
根据 user6910411 的建议,我将 "a" 缓存为
a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache()
问题已解决。