Spark 流式传输重复网络调用
Spark streaming duplicate network calls
我将 pyspark 与 Kafka 接收器结合使用来处理推文流。我的应用程序的一个步骤包括调用 Google 自然语言 API 以获得每条推文的情绪分数。但是,我发现 API 每条已处理的推文都会收到多个调用(我在 Google 云控制台中看到了调用次数)。
此外,如果我打印 tweetID(在映射函数内),我会得到相同的 ID 3 或 4 次。在我的应用程序结束时,推文被发送到 Kafka 中的另一个主题,在那里我得到了正确的推文计数(没有重复的 ID),所以原则上一切正常,但我不知道如何避免调用 Google API 每条推文不止一次。
这是否与 Spark 或 Kafka 中的一些配置参数有关?
这是我的控制台输出示例:
TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
但在 Kafka 接收器中,我只收到 4 条处理过的推文(这是正确的接收方式,因为它们只有 4 条独特的推文)。
执行此操作的代码是:
def sendToKafka(rdd,topic,address):
publish_producer = KafkaProducer(bootstrap_servers=address,\
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
records = rdd.collect()
msg_dict = defaultdict(list)
for rec in records:
msg_dict["results"].append(rec)
publish_producer.send(resultTopic,msg_dict)
publish_producer.close()
kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})
dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
.map(lambda post: add_normalized_text(post))\
.map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
.filter(lambda post: post["keywords"] == True)\
.map(lambda post: googleNLP.complementTweetFeatures(post,job_id))
dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))
我已经找到解决办法了!我只需要缓存 DStream:
dstream_tweets.cache()
发生多次网络调用是因为 Spark 在我的脚本中执行后面的操作之前重新计算了该 DStream 中的 RDD。当我 cache() DStream 时,只需要计算一次;并且由于它保存在内存中,以后的函数可以访问该信息而无需重新计算(在这种情况下,重新计算涉及再次调用 API,因此值得付出更多内存使用的代价) .
我将 pyspark 与 Kafka 接收器结合使用来处理推文流。我的应用程序的一个步骤包括调用 Google 自然语言 API 以获得每条推文的情绪分数。但是,我发现 API 每条已处理的推文都会收到多个调用(我在 Google 云控制台中看到了调用次数)。
此外,如果我打印 tweetID(在映射函数内),我会得到相同的 ID 3 或 4 次。在我的应用程序结束时,推文被发送到 Kafka 中的另一个主题,在那里我得到了正确的推文计数(没有重复的 ID),所以原则上一切正常,但我不知道如何避免调用 Google API 每条推文不止一次。
这是否与 Spark 或 Kafka 中的一些配置参数有关?
这是我的控制台输出示例:
TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
但在 Kafka 接收器中,我只收到 4 条处理过的推文(这是正确的接收方式,因为它们只有 4 条独特的推文)。
执行此操作的代码是:
def sendToKafka(rdd,topic,address):
publish_producer = KafkaProducer(bootstrap_servers=address,\
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
records = rdd.collect()
msg_dict = defaultdict(list)
for rec in records:
msg_dict["results"].append(rec)
publish_producer.send(resultTopic,msg_dict)
publish_producer.close()
kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})
dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
.map(lambda post: add_normalized_text(post))\
.map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
.filter(lambda post: post["keywords"] == True)\
.map(lambda post: googleNLP.complementTweetFeatures(post,job_id))
dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))
我已经找到解决办法了!我只需要缓存 DStream:
dstream_tweets.cache()
发生多次网络调用是因为 Spark 在我的脚本中执行后面的操作之前重新计算了该 DStream 中的 RDD。当我 cache() DStream 时,只需要计算一次;并且由于它保存在内存中,以后的函数可以访问该信息而无需重新计算(在这种情况下,重新计算涉及再次调用 API,因此值得付出更多内存使用的代价) .