Kafka 创建流 运行 但不在 Pyspark 中打印来自 Kafka Topic 的已处理输出
Kafka create stream running but not printing the processed output from Kafka Topic in Pyspark
我使用的是Kafka 2.0版本,Spark版本是2.2.0.2.6.4.0-91,Python版本是2.7.5
我是 运行 下面的代码,它没有任何错误地流式传输,但输出中没有打印计数。
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 60)
print("spark cotext set")
zkQuorum, topic = 'master.hdp:2181','streamit'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
print("connection set")
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我的 Spark 提交命令是
/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kstream.py
我输出日志的最后一部分显示。它获取流但不显示所需的处理输出。
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------
20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.0 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Starting job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Registering RDD 7 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:29:00 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 4 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.1 KB, free 366.2 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.2 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 4.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 71, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 172.16.0.21:51120
20/01/22 19:29:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 83 bytes
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 71) in 473 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:455) finished in 0.476 s
20/01/22 19:29:00 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:455, took 0.497775 s
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 6 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 6.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 72, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 72) in 123 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:455) finished in 0.125 s
20/01/22 19:29:00 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:455, took 0.136936 s
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------
20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Total delay: 0.811 s for time 1579701540000 ms (execution: 0.684 s)
20/01/22 19:29:00 INFO ReceivedBlockTracker: Deleting batches:
20/01/22 19:29:00 INFO InputInfoTracker: remove old batch metadata:
20/01/22 19:30:00 INFO JobScheduler: Added jobs for time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
-------------------------------------------
Time: 2020-01-22 19:30:00
-------------------------------------------
20/01/22 19:30:00 INFO JobScheduler: Finished job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.1 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Registering RDD 16 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:30:00 INFO DAGScheduler: Got job 4 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 8 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 8.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 73, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to 172.16.0.21:51120
20/01/22 19:30:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 83 bytes
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 73) in 120 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 8 (runJob at PythonRDD.scala:455) finished in 0.121 s
20/01/22 19:30:00 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:455, took 0.134627 s
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Got job 5 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 10 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 9)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 10.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 74, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 74) in 132 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 10 (runJob at PythonRDD.scala:455) finished in 0.133 s
20/01/22 19:30:00 INFO DAGScheduler: Job 5 finished: runJob at PythonRDD.scala:455, took 0.143611 s
您似乎没有使用记录器,而是 print()
正在使用标准输出。为了记录到您的日志文件,您需要设置一个记录器。例如,以下从 SparkContext
获取记录器:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("Example info")
LOGGER.error("Example error")
...
在 kafkaParams 中提供 key.deserializer
和 value.deserializer
并使用 createDirectStream
kafkaParams = {"metadata.broker.list": config['kstream']['broker'], "auto_offset_reset" : 'earliest',"auto.create.topics.enable": "true","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"}
kvs = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
我使用的是Kafka 2.0版本,Spark版本是2.2.0.2.6.4.0-91,Python版本是2.7.5 我是 运行 下面的代码,它没有任何错误地流式传输,但输出中没有打印计数。
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 60)
print("spark cotext set")
zkQuorum, topic = 'master.hdp:2181','streamit'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
print("connection set")
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我的 Spark 提交命令是
/usr/hdp/current/spark2-client/bin/spark-submit --principal hdfs-ivory@KDCAUTH.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --master yarn --deploy-mode client --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kstream.py
我输出日志的最后一部分显示。它获取流但不显示所需的处理输出。
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------
20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.0 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Starting job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Registering RDD 7 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:29:00 INFO DAGScheduler: Got job 2 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 4 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 8.1 KB, free 366.2 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.2 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (PythonRDD[11] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 4.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 71, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 172.16.0.21:51120
20/01/22 19:29:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 83 bytes
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 71) in 473 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 4 (runJob at PythonRDD.scala:455) finished in 0.476 s
20/01/22 19:29:00 INFO DAGScheduler: Job 2 finished: runJob at PythonRDD.scala:455, took 0.497775 s
20/01/22 19:29:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:29:00 INFO DAGScheduler: Got job 3 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:29:00 INFO DAGScheduler: Final stage: ResultStage 6 (runJob at PythonRDD.scala:455)
20/01/22 19:29:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
20/01/22 19:29:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:29:00 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:29:00 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006
20/01/22 19:29:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 6 (PythonRDD[12] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:29:00 INFO YarnScheduler: Adding task set 6.0 with 1 tasks
20/01/22 19:29:00 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 72, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:29:00 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:29:00 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 72) in 123 ms on master.hdp (executor 1) (1/1)
20/01/22 19:29:00 INFO YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool
20/01/22 19:29:00 INFO DAGScheduler: ResultStage 6 (runJob at PythonRDD.scala:455) finished in 0.125 s
20/01/22 19:29:00 INFO DAGScheduler: Job 3 finished: runJob at PythonRDD.scala:455, took 0.136936 s
-------------------------------------------
Time: 2020-01-22 19:29:00
-------------------------------------------
20/01/22 19:29:00 INFO JobScheduler: Finished job streaming job 1579701540000 ms.1 from job set of time 1579701540000 ms
20/01/22 19:29:00 INFO JobScheduler: Total delay: 0.811 s for time 1579701540000 ms (execution: 0.684 s)
20/01/22 19:29:00 INFO ReceivedBlockTracker: Deleting batches:
20/01/22 19:29:00 INFO InputInfoTracker: remove old batch metadata:
20/01/22 19:30:00 INFO JobScheduler: Added jobs for time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
-------------------------------------------
Time: 2020-01-22 19:30:00
-------------------------------------------
20/01/22 19:30:00 INFO JobScheduler: Finished job streaming job 1579701600000 ms.0 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO JobScheduler: Starting job streaming job 1579701600000 ms.1 from job set of time 1579701600000 ms
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Registering RDD 16 (call at /usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:2230)
20/01/22 19:30:00 INFO DAGScheduler: Got job 4 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 8 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 7)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (PythonRDD[20] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(0))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 8.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 73, master.hdp, executor 1, partition 0, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to 172.16.0.21:51120
20/01/22 19:30:00 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 83 bytes
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 73) in 120 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 8 (runJob at PythonRDD.scala:455) finished in 0.121 s
20/01/22 19:30:00 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:455, took 0.134627 s
20/01/22 19:30:00 INFO SparkContext: Starting job: runJob at PythonRDD.scala:455
20/01/22 19:30:00 INFO DAGScheduler: Got job 5 (runJob at PythonRDD.scala:455) with 1 output partitions
20/01/22 19:30:00 INFO DAGScheduler: Final stage: ResultStage 10 (runJob at PythonRDD.scala:455)
20/01/22 19:30:00 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 9)
20/01/22 19:30:00 INFO DAGScheduler: Missing parents: List()
20/01/22 19:30:00 INFO DAGScheduler: Submitting ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48), which has no missing parents
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.1 KB, free 366.1 MB)
20/01/22 19:30:00 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 4.4 KB, free 366.1 MB)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.16.0.21:40801 (size: 4.4 KB, free: 366.2 MB)
20/01/22 19:30:00 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
20/01/22 19:30:00 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (PythonRDD[21] at RDD at PythonRDD.scala:48) (first 15 tasks are for partitions Vector(1))
20/01/22 19:30:00 INFO YarnScheduler: Adding task set 10.0 with 1 tasks
20/01/22 19:30:00 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 74, master.hdp, executor 1, partition 1, PROCESS_LOCAL, 4632 bytes)
20/01/22 19:30:00 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on master.hdp:41184 (size: 4.4 KB, free: 366.3 MB)
20/01/22 19:30:00 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 74) in 132 ms on master.hdp (executor 1) (1/1)
20/01/22 19:30:00 INFO YarnScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool
20/01/22 19:30:00 INFO DAGScheduler: ResultStage 10 (runJob at PythonRDD.scala:455) finished in 0.133 s
20/01/22 19:30:00 INFO DAGScheduler: Job 5 finished: runJob at PythonRDD.scala:455, took 0.143611 s
您似乎没有使用记录器,而是 print()
正在使用标准输出。为了记录到您的日志文件,您需要设置一个记录器。例如,以下从 SparkContext
获取记录器:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("Example info")
LOGGER.error("Example error")
...
在 kafkaParams 中提供 key.deserializer
和 value.deserializer
并使用 createDirectStream
kafkaParams = {"metadata.broker.list": config['kstream']['broker'], "auto_offset_reset" : 'earliest',"auto.create.topics.enable": "true","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"}
kvs = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)