将 python 文件提交到本地服务器时,Spark Twitter 流式应用程序 windows 10 出错

Spark twitter streaming application Error on windows 10 when submitting python file to local server

我正在尝试 运行 一个流媒体应用程序来计算特定用户的推文。 生产者代码:

# -*- coding: utf-8 -*-
import tweepy
import json
import base64
from kafka import KafkaProducer
import kafka.errors

# Twitter API credentials
CONSUMER_KEY   = "***"
CONSUMER_SECRET   = "***"
ACCESS_TOKEN   = "***"
ACCESS_TOKEN_SECRET   = "***"

# Kafka topic name
TOPIC_NAME = "tweet-kafka"

# Kafka server
KAFKA_HOST = "localhost"
KAFKA_PORT = "9092"

#a list of ids, the actual ids have been hidden in this question
ids = ["11111", "222222"]

auth= tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)

class KafkaCommunicator:
    def __init__(self, producer, topic):
        self.producer = producer
        self.topic = topic

    def send(self, message):
        self.producer.send(self.topic, message.encode("utf-8"))

    def close(self):
        self.producer.close()
        
class MyStreamListener(tweepy.StreamListener):
    """Listener to tweet stream from twitter."""
    def __init__(self,communicator,api=None):
        super(MyStreamListener,self).__init__()
        self.communicator = communicator
        self.num_tweets=0

    def on_data(self, raw_data):
        data = json.loads(raw_data)
        #print(data)
        if "user" in data:
            user_id = data["user"]["id_str"]
            if user_id in ids:
                print("Time: " + data["created_at"] + "; id: " + user_id + "; screen_name: " + data["user"]["screen_name"] )
                # put message into Kafka
                self.communicator.send(data["user"]["screen_name"])
        return True
        
    def on_error(self, status):
        print(status)
        return True

def create_communicator():
    """Create Kafka producer."""
    producer = KafkaProducer(bootstrap_servers=KAFKA_HOST + ":" + KAFKA_PORT)
    return KafkaCommunicator(producer, TOPIC_NAME)


def create_stream(communicator):
    """Set stream for twitter api with custom listener."""
    listener = MyStreamListener(communicator=communicator)
    stream =tweepy.Stream(auth,listener)
    return stream

def run_processing(stream):
    # Start filtering messages
    stream.filter(follow=ids)

def main():
    communicator = None
    tweet_stream = None
    try:
        communicator = create_communicator()
        tweet_stream = create_stream(communicator)
        run_processing(tweet_stream)
    except KeyboardInterrupt:
        pass
    except kafka.errors.NoBrokersAvailable:
        print("Kafka broker not found.")
    finally:
        if communicator:
            communicator.close()
        if tweet_stream:
            tweet_stream.disconnect()


if __name__ == "__main__":
    main()

流媒体应用程序代码:

# -*- coding: utf-8 -*-
import sys
import os

spark_path = "D:/spark/spark-2.4.7-bin-hadoop2.7" # spark installed folder
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
sys.path.insert(0, spark_path + "/bin")
sys.path.insert(0, spark_path + "/python/pyspark/")
sys.path.insert(0, spark_path + "/python/lib/pyspark.zip")
sys.path.insert(0, spark_path + "/python/lib/py4j-0.10.7-src.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYTHONHASHSEED'] = "0"
os.environ['SPARK_YARN_USER_ENV'] = PYTHONHASHSEED = "0"

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

SPARK_APP_NAME = "SparkStreamingKafkaTwitter"
SPARK_CHECKPOINT_TMP_DIR = "D:/tmp"
SPARK_BATCH_INTERVAL = 10
SPARK_LOG_LEVEL = "OFF"

KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" #Default Zookeeper Consumer Address
KAFKA_TOPIC = "tweet-kafka"

import json

def create_streaming_context():
    """Create Spark streaming context."""
    conf = SparkConf().set("spark.executor.memory", "2g")\
                .set("spark.driver.memory", "2g")\
                .set("spark.driver.bindAddress", "0.0.0.0")
    # Create Spark Context
    sc = SparkContext(master = "local[2]", appName=SPARK_APP_NAME, conf = conf)
    # Set log level
    sc.setLogLevel(SPARK_LOG_LEVEL)
    # Create Streaming Context
    ssc = StreamingContext(sc, SPARK_BATCH_INTERVAL)
    # Sets the context to periodically checkpoint the DStream operations for master
    # fault-tolerance. The graph will be checkpointed every batch interval.
    # It is used to update results of stateful transformations as well
    ssc.checkpoint(SPARK_CHECKPOINT_TMP_DIR)
    return ssc

def create_stream(ssc):
    """
    Create subscriber (consumer) to the Kafka topic (works on RDD that is mini-batch).
    """
    return (
        KafkaUtils.createDirectStream(
            ssc, topics=[KAFKA_TOPIC],
            kafkaParams={"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})
            .map(lambda x:x[1])
    )

def main():

    # Init Spark streaming context
    ssc = create_streaming_context()

    # Get tweets stream
    kafka_stream = create_stream(ssc)

    # using reduce, count the number of user's tweets for x minute every 30 seconds
    # descending sort the result
    # Print result
    
    # for 1 minute
    tweets_for_1_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x + y, lambda x,y: x - y, windowDuration=60, slideDuration=30)
    sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
    sorted_tweets_for_1_min.pprint()
    
    # for 10 minute
    tweets_for_10_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x + y, lambda x,y: x - y, windowDuration=600, slideDuration=30)
    sorted_tweets_for_10_min = tweets_for_10_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: [1], ascending=False))
    sorted_tweets_for_10_min.pprint()

    # Start Spark Streaming
    ssc.start()

    # Waiting for termination
    ssc.awaitTermination()


if __name__ == "__main__":
    main()

我安装了以下软件:

  1. jdk1.8.0_311 和 jre1.8.0_311
  2. python 2.7
  3. hadoop-2.7.1 正常工作
  4. spark-2.4.7-bin-hadoop2.7
  5. kafka_2.13-3.0.0 我已经正确设置了环境变量 但是我在执行提交命令后 运行 时间内收到以下异常:
spark-submit --master local[2] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 d:\task1\tweet_kafka_streaming_app.py

处理流时出现异常:

-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------

-------------------------------------------
Time: 2021-12-06 15:28:30
-------------------------------------------

Traceback (most recent call last):
  File "d:/task1/tweet_kafka_streaming_app.py", line 95, in <module>
    main()
  File "d:/task1/tweet_kafka_streaming_app.py", line 91, in main
    ssc.awaitTermination()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\context.py", line 192, in awaitTermination
    varName = k[len("spark.executorEnv."):]
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__

  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o32.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\util.py", line 68, in call
    r = self.func(t, *rdds)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\streaming\dstream.py", line 297, in <lambda>
    func = lambda t, rdd: oldfunc(rdd)
  File "d:/task1/tweet_kafka_streaming_app.py", line 79, in <lambda>
    sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 699, in sortBy
    return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 667, in sortByKey
    rddSize = self.count()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1913)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1912)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:948)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:948)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect.apply(RDD.scala:990)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 2499, in pipeline_func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 352, in func
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\rdd.py", line 1861, in combineLocally
  File "D:\spark\spark-2.4.7-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:592)
        at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:575)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more


        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)

Exception in thread Thread-4 (most likely raised during interpreter shutdown):

C:\WINDOWS\system32>

感谢@OneCricketeer 的提示,我已经解决了这个问题。 我将 python 升级到 3.8,但遇到了另一个错误。 降级到python 3.7,支持Spark 2.4.8或Spark 2.4.7 with Hadoop 2.7,我的世界又闪耀了。