sortbykey() 似乎不适用于 pyspark 中的字符串
sortbykey() doesn't seem to work on strings in pyspark
我在文件 "TESTSortbykey.md" 中保存了两行诗 "Mary had a little lamb" 并且 运行 在 PYSPARK 中对其执行了以下命令:
testsortbykey=sc.textFile("file:///opt/hadoop/spark-1.6.0/TESTSortbykey.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
在 运行 testsortbykey.collect()
我得到了输出:
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1),
(u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1),
(u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
一旦我有一个 Pair RDD testsortbykey,我想应用 reduceByKey() 和 sortByKey() 但两者似乎都没有 work.The 我使用的命令是:
testsortbykey.sortByKey()
testsortbykey.collect()
testsortbykey.reduceByKey(lambda x,y: x+y )
testsortbykey.collect()
我在这两种情况下得到的输出是:
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1),
(u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1),
(u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
很明显,即使有多个相同的键(如 'Mary'、'had' 等),这些值也没有合并。
谁能解释一下为什么?另外我应该怎么做才能克服这个问题?
编辑:
这是我的控制台的样子,希望对您有所帮助:
>>> testsortbykey=sc.textFile("file:///opt/hadoop/spark-1.6.0/TESTSortbykey.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
17/06/01 11:44:48 INFO storage.MemoryStore: Block broadcast_103 stored as values in memory (estimated size 228.9 KB, free 4.4 MB)
17/06/01 11:44:48 INFO storage.MemoryStore: Block broadcast_103_piece0 stored as bytes in memory (estimated size 19.5 KB, free 4.4 MB)
17/06/01 11:44:48 INFO storage.BlockManagerInfo: Added broadcast_103_piece0 in memory on localhost:57701 (size: 19.5 KB, free: 511.1 MB)
17/06/01 11:44:48 INFO spark.SparkContext: Created broadcast 103 from textFile at null:-1
>>> testsortbykey.sortByKey()
17/06/01 11:45:48 INFO mapred.FileInputFormat: Total input paths to process : 1
17/06/01 11:45:48 INFO spark.SparkContext: Starting job: sortByKey at <stdin>:1
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Got job 74 (sortByKey at <stdin>:1) with 2 output partitions
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 89 (sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting ResultStage 89 (PythonRDD[200] at sortByKey at <stdin>:1), which has no missing parents
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_104 stored as values in memory (estimated size 6.2 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_104_piece0 stored as bytes in memory (estimated size 3.9 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.BlockManagerInfo: Added broadcast_104_piece0 in memory on localhost:57701 (size: 3.9 KB, free: 511.1 MB)
17/06/01 11:45:48 INFO spark.SparkContext: Created broadcast 104 from broadcast at DAGScheduler.scala:1006
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 89 (PythonRDD[200] at sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Adding task set 89.0 with 2 tasks
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 89.0 (TID 183, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 89.0 (TID 184, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO executor.Executor: Running task 0.0 in stage 89.0 (TID 183)
17/06/01 11:45:48 INFO executor.Executor: Running task 1.0 in stage 89.0 (TID 184)
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 3, boot = 1, init = 1, finish = 1
17/06/01 11:45:48 INFO executor.Executor: Finished task 0.0 in stage 89.0 (TID 183). 2124 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 89.0 (TID 183) in 9 ms on localhost (1/2)
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 7, boot = 3, init = 4, finish = 0
17/06/01 11:45:48 INFO executor.Executor: Finished task 1.0 in stage 89.0 (TID 184). 2124 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 89.0 (TID 184) in 13 ms on localhost (2/2)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 89.0, whose tasks have all completed, from pool
17/06/01 11:45:48 INFO scheduler.DAGScheduler: ResultStage 89 (sortByKey at <stdin>:1) finished in 0.013 s
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Job 74 finished: sortByKey at <stdin>:1, took 0.017325 s
17/06/01 11:45:48 INFO spark.SparkContext: Starting job: sortByKey at <stdin>:1
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Got job 75 (sortByKey at <stdin>:1) with 2 output partitions
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 90 (sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting ResultStage 90 (PythonRDD[201] at sortByKey at <stdin>:1), which has no missing parents
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_105 stored as values in memory (estimated size 6.0 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_105_piece0 stored as bytes in memory (estimated size 3.9 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.BlockManagerInfo: Added broadcast_105_piece0 in memory on localhost:57701 (size: 3.9 KB, free: 511.1 MB)
17/06/01 11:45:48 INFO spark.SparkContext: Created broadcast 105 from broadcast at DAGScheduler.scala:1006
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 90 (PythonRDD[201] at sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Adding task set 90.0 with 2 tasks
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 90.0 (TID 185, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 90.0 (TID 186, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO executor.Executor: Running task 1.0 in stage 90.0 (TID 186)
17/06/01 11:45:48 INFO executor.Executor: Running task 0.0 in stage 90.0 (TID 185)
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 42, boot = -8, init = 49, finish = 1
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 41, boot = -6, init = 47, finish = 0
17/06/01 11:45:48 INFO executor.Executor: Finished task 0.0 in stage 90.0 (TID 185). 2382 bytes result sent to driver
17/06/01 11:45:48 INFO executor.Executor: Finished task 1.0 in stage 90.0 (TID 186). 2223 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 90.0 (TID 185) in 49 ms on localhost (1/2)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 90.0 (TID 186) in 51 ms on localhost (2/2)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 90.0, whose tasks have all completed, from pool
17/06/01 11:45:48 INFO scheduler.DAGScheduler: ResultStage 90 (sortByKey at <stdin>:1) finished in 0.051 s
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Job 75 finished: sortByKey at <stdin>:1, took 0.055618 s
PythonRDD[206] at RDD at PythonRDD.scala:43
>>> testsortbykey.collect()
17/06/01 11:46:04 INFO spark.SparkContext: Starting job: collect at <stdin>:1
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Got job 76 (collect at <stdin>:1) with 2 output partitions
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Final stage: ResultStage 91 (collect at <stdin>:1)
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Submitting ResultStage 91 (PythonRDD[207] at collect at <stdin>:1), which has no missing parents
17/06/01 11:46:04 INFO storage.MemoryStore: Block broadcast_106 stored as values in memory (estimated size 5.3 KB, free 4.4 MB)
17/06/01 11:46:04 INFO storage.MemoryStore: Block broadcast_106_piece0 stored as bytes in memory (estimated size 3.3 KB, free 4.4 MB)
17/06/01 11:46:04 INFO storage.BlockManagerInfo: Added broadcast_106_piece0 in memory on localhost:57701 (size: 3.3 KB, free: 511.1 MB)
17/06/01 11:46:04 INFO spark.SparkContext: Created broadcast 106 from broadcast at DAGScheduler.scala:1006
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 91 (PythonRDD[207] at collect at <stdin>:1)
17/06/01 11:46:04 INFO scheduler.TaskSchedulerImpl: Adding task set 91.0 with 2 tasks
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 91.0 (TID 187, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 91.0 (TID 188, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:46:04 INFO executor.Executor: Running task 0.0 in stage 91.0 (TID 187)
17/06/01 11:46:04 INFO executor.Executor: Running task 1.0 in stage 91.0 (TID 188)
17/06/01 11:46:04 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:46:04 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:46:04 INFO python.PythonRunner: Times: total = 41, boot = -16016, init = 16056, finish = 1
17/06/01 11:46:04 INFO python.PythonRunner: Times: total = 41, boot = -16017, init = 16057, finish = 1
17/06/01 11:46:04 INFO executor.Executor: Finished task 0.0 in stage 91.0 (TID 187). 2451 bytes result sent to driver
17/06/01 11:46:04 INFO executor.Executor: Finished task 1.0 in stage 91.0 (TID 188). 2252 bytes result sent to driver
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 91.0 (TID 187) in 48 ms on localhost (1/2)
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 91.0 (TID 188) in 49 ms on localhost (2/2)
17/06/01 11:46:04 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 91.0, whose tasks have all completed, from pool
17/06/01 11:46:04 INFO scheduler.DAGScheduler: ResultStage 91 (collect at <stdin>:1) finished in 0.051 s
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Job 76 finished: collect at <stdin>:1, took 0.055614 s
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
>>> testsortbykey.reduceByKey(lambda x,y: x+y)
PythonRDD[212] at RDD at PythonRDD.scala:43
>>> testsortbykey.collect()
17/06/01 11:47:06 INFO spark.SparkContext: Starting job: collect at <stdin>:1
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Got job 77 (collect at <stdin>:1) with 2 output partitions
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 92 (collect at <stdin>:1)
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Submitting ResultStage 92 (PythonRDD[207] at collect at <stdin>:1), which has no missing parents
17/06/01 11:47:06 INFO storage.MemoryStore: Block broadcast_107 stored as values in memory (estimated size 5.3 KB, free 4.5 MB)
17/06/01 11:47:06 INFO storage.MemoryStore: Block broadcast_107_piece0 stored as bytes in memory (estimated size 3.3 KB, free 4.5 MB)
17/06/01 11:47:06 INFO storage.BlockManagerInfo: Added broadcast_107_piece0 in memory on localhost:57701 (size: 3.3 KB, free: 511.1 MB)
17/06/01 11:47:06 INFO spark.SparkContext: Created broadcast 107 from broadcast at DAGScheduler.scala:1006
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 92 (PythonRDD[207] at collect at <stdin>:1)
17/06/01 11:47:06 INFO scheduler.TaskSchedulerImpl: Adding task set 92.0 with 2 tasks
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 92.0 (TID 189, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 92.0 (TID 190, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:47:06 INFO executor.Executor: Running task 0.0 in stage 92.0 (TID 189)
17/06/01 11:47:06 INFO executor.Executor: Running task 1.0 in stage 92.0 (TID 190)
17/06/01 11:47:06 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:47:06 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:47:06 INFO python.PythonRunner: Times: total = 3, boot = 2, init = 1, finish = 0
17/06/01 11:47:06 INFO executor.Executor: Finished task 1.0 in stage 92.0 (TID 190). 2252 bytes result sent to driver
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 92.0 (TID 190) in 13 ms on localhost (1/2)
17/06/01 11:47:06 INFO python.PythonRunner: Times: total = 11, boot = 3, init = 7, finish = 1
17/06/01 11:47:06 INFO executor.Executor: Finished task 0.0 in stage 92.0 (TID 189). 2451 bytes result sent to driver
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 92.0 (TID 189) in 16 ms on localhost (2/2)
17/06/01 11:47:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 92.0, whose tasks have all completed, from pool
17/06/01 11:47:06 INFO scheduler.DAGScheduler: ResultStage 92 (collect at <stdin>:1) finished in 0.017 s
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Job 77 finished: collect at <stdin>:1, took 0.020758 s
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
>>>
第一步正确:
>>> rdd = sc.textFile("./yourFile.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
>>> rdd.collect()
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u"It's", 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow,', 1), (u'yeah', 1), (u'Everywhere', 1), (u'the', 1), (u'child', 1),
(u'went', 1), (u'The', 1), (u'lamb,', 1), (u'the', 1), (u'lamb', 1),
(u'was', 1), (u'sure', 1), (u'to', 1), (u'go,', 1), (u'yeah', 1)]
有什么问题吗?
如果你这样做:
>>> rdd.reduceByKey(lambda x,y: x+y)
然后是这个:
>>> rdd.collect()
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u"It's", 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow,', 1), (u'yeah', 1), (u'Everywhere', 1), (u'the', 1), (u'child', 1),
(u'went', 1), (u'The', 1), (u'lamb,', 1), (u'the', 1), (u'lamb', 1),
(u'was', 1), (u'sure', 1), (u'to', 1), (u'go,', 1), (u'yeah', 1)]
你只应用了一个转换,但你没有改变你的起始 rdd。
但是..
第一个选项(如果你只想看到转换):
>>> rdd.reduceByKey(lambda x,y: x+y).collect()
[(u'a', 1), (u'lamb', 2), (u'little', 1), (u'white', 1), (u'had', 1),
(u'fleece', 1), (u'The', 1), (u'snow,', 1), (u'Everywhere', 1), (u'went', 1), (u'was', 2),
(u'the', 2), (u'as', 1), (u'go,', 1), (u'sure', 1), (u'lamb,', 1),
(u"It's", 1), (u'yeah', 2), (u'to', 1), (u'child', 1), (u'Mary', 1)]
第二个选项(如果你想在新的 rdd 中保存你的转换):
如果你这样做:
>>> rddReduced = rdd.reduceByKey(lambda x,y: x+y)
然后是这个:
>>> rddReduced.collect()
[(u'a', 1), (u'lamb', 2), (u'little', 1), (u'white', 1), (u'had', 1),
(u'fleece', 1), (u'The', 1), (u'snow,', 1), (u'Everywhere', 1), (u'went', 1),
(u'was', 2), (u'the', 2), (u'as', 1), (u'go,', 1), (u'sure', 1), (u'lamb,', 1),
(u"It's", 1), (u'yeah', 2), (u'to', 1), (u'child', 1), (u'Mary', 1)]
您已经应用并保存了您的转换,结果就是您要找的。
同样的概念,如果你想应用 sortByKey()
我在文件 "TESTSortbykey.md" 中保存了两行诗 "Mary had a little lamb" 并且 运行 在 PYSPARK 中对其执行了以下命令:
testsortbykey=sc.textFile("file:///opt/hadoop/spark-1.6.0/TESTSortbykey.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
在 运行 testsortbykey.collect()
我得到了输出:
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
一旦我有一个 Pair RDD testsortbykey,我想应用 reduceByKey() 和 sortByKey() 但两者似乎都没有 work.The 我使用的命令是:
testsortbykey.sortByKey()
testsortbykey.collect()
testsortbykey.reduceByKey(lambda x,y: x+y )
testsortbykey.collect()
我在这两种情况下得到的输出是:
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
很明显,即使有多个相同的键(如 'Mary'、'had' 等),这些值也没有合并。
谁能解释一下为什么?另外我应该怎么做才能克服这个问题?
编辑: 这是我的控制台的样子,希望对您有所帮助:
>>> testsortbykey=sc.textFile("file:///opt/hadoop/spark-1.6.0/TESTSortbykey.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
17/06/01 11:44:48 INFO storage.MemoryStore: Block broadcast_103 stored as values in memory (estimated size 228.9 KB, free 4.4 MB)
17/06/01 11:44:48 INFO storage.MemoryStore: Block broadcast_103_piece0 stored as bytes in memory (estimated size 19.5 KB, free 4.4 MB)
17/06/01 11:44:48 INFO storage.BlockManagerInfo: Added broadcast_103_piece0 in memory on localhost:57701 (size: 19.5 KB, free: 511.1 MB)
17/06/01 11:44:48 INFO spark.SparkContext: Created broadcast 103 from textFile at null:-1
>>> testsortbykey.sortByKey()
17/06/01 11:45:48 INFO mapred.FileInputFormat: Total input paths to process : 1
17/06/01 11:45:48 INFO spark.SparkContext: Starting job: sortByKey at <stdin>:1
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Got job 74 (sortByKey at <stdin>:1) with 2 output partitions
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 89 (sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting ResultStage 89 (PythonRDD[200] at sortByKey at <stdin>:1), which has no missing parents
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_104 stored as values in memory (estimated size 6.2 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_104_piece0 stored as bytes in memory (estimated size 3.9 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.BlockManagerInfo: Added broadcast_104_piece0 in memory on localhost:57701 (size: 3.9 KB, free: 511.1 MB)
17/06/01 11:45:48 INFO spark.SparkContext: Created broadcast 104 from broadcast at DAGScheduler.scala:1006
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 89 (PythonRDD[200] at sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Adding task set 89.0 with 2 tasks
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 89.0 (TID 183, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 89.0 (TID 184, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO executor.Executor: Running task 0.0 in stage 89.0 (TID 183)
17/06/01 11:45:48 INFO executor.Executor: Running task 1.0 in stage 89.0 (TID 184)
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 3, boot = 1, init = 1, finish = 1
17/06/01 11:45:48 INFO executor.Executor: Finished task 0.0 in stage 89.0 (TID 183). 2124 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 89.0 (TID 183) in 9 ms on localhost (1/2)
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 7, boot = 3, init = 4, finish = 0
17/06/01 11:45:48 INFO executor.Executor: Finished task 1.0 in stage 89.0 (TID 184). 2124 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 89.0 (TID 184) in 13 ms on localhost (2/2)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 89.0, whose tasks have all completed, from pool
17/06/01 11:45:48 INFO scheduler.DAGScheduler: ResultStage 89 (sortByKey at <stdin>:1) finished in 0.013 s
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Job 74 finished: sortByKey at <stdin>:1, took 0.017325 s
17/06/01 11:45:48 INFO spark.SparkContext: Starting job: sortByKey at <stdin>:1
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Got job 75 (sortByKey at <stdin>:1) with 2 output partitions
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 90 (sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting ResultStage 90 (PythonRDD[201] at sortByKey at <stdin>:1), which has no missing parents
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_105 stored as values in memory (estimated size 6.0 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.MemoryStore: Block broadcast_105_piece0 stored as bytes in memory (estimated size 3.9 KB, free 4.4 MB)
17/06/01 11:45:48 INFO storage.BlockManagerInfo: Added broadcast_105_piece0 in memory on localhost:57701 (size: 3.9 KB, free: 511.1 MB)
17/06/01 11:45:48 INFO spark.SparkContext: Created broadcast 105 from broadcast at DAGScheduler.scala:1006
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 90 (PythonRDD[201] at sortByKey at <stdin>:1)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Adding task set 90.0 with 2 tasks
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 90.0 (TID 185, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 90.0 (TID 186, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:45:48 INFO executor.Executor: Running task 1.0 in stage 90.0 (TID 186)
17/06/01 11:45:48 INFO executor.Executor: Running task 0.0 in stage 90.0 (TID 185)
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:45:48 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 42, boot = -8, init = 49, finish = 1
17/06/01 11:45:48 INFO python.PythonRunner: Times: total = 41, boot = -6, init = 47, finish = 0
17/06/01 11:45:48 INFO executor.Executor: Finished task 0.0 in stage 90.0 (TID 185). 2382 bytes result sent to driver
17/06/01 11:45:48 INFO executor.Executor: Finished task 1.0 in stage 90.0 (TID 186). 2223 bytes result sent to driver
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 90.0 (TID 185) in 49 ms on localhost (1/2)
17/06/01 11:45:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 90.0 (TID 186) in 51 ms on localhost (2/2)
17/06/01 11:45:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 90.0, whose tasks have all completed, from pool
17/06/01 11:45:48 INFO scheduler.DAGScheduler: ResultStage 90 (sortByKey at <stdin>:1) finished in 0.051 s
17/06/01 11:45:48 INFO scheduler.DAGScheduler: Job 75 finished: sortByKey at <stdin>:1, took 0.055618 s
PythonRDD[206] at RDD at PythonRDD.scala:43
>>> testsortbykey.collect()
17/06/01 11:46:04 INFO spark.SparkContext: Starting job: collect at <stdin>:1
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Got job 76 (collect at <stdin>:1) with 2 output partitions
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Final stage: ResultStage 91 (collect at <stdin>:1)
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Submitting ResultStage 91 (PythonRDD[207] at collect at <stdin>:1), which has no missing parents
17/06/01 11:46:04 INFO storage.MemoryStore: Block broadcast_106 stored as values in memory (estimated size 5.3 KB, free 4.4 MB)
17/06/01 11:46:04 INFO storage.MemoryStore: Block broadcast_106_piece0 stored as bytes in memory (estimated size 3.3 KB, free 4.4 MB)
17/06/01 11:46:04 INFO storage.BlockManagerInfo: Added broadcast_106_piece0 in memory on localhost:57701 (size: 3.3 KB, free: 511.1 MB)
17/06/01 11:46:04 INFO spark.SparkContext: Created broadcast 106 from broadcast at DAGScheduler.scala:1006
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 91 (PythonRDD[207] at collect at <stdin>:1)
17/06/01 11:46:04 INFO scheduler.TaskSchedulerImpl: Adding task set 91.0 with 2 tasks
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 91.0 (TID 187, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 91.0 (TID 188, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:46:04 INFO executor.Executor: Running task 0.0 in stage 91.0 (TID 187)
17/06/01 11:46:04 INFO executor.Executor: Running task 1.0 in stage 91.0 (TID 188)
17/06/01 11:46:04 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:46:04 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:46:04 INFO python.PythonRunner: Times: total = 41, boot = -16016, init = 16056, finish = 1
17/06/01 11:46:04 INFO python.PythonRunner: Times: total = 41, boot = -16017, init = 16057, finish = 1
17/06/01 11:46:04 INFO executor.Executor: Finished task 0.0 in stage 91.0 (TID 187). 2451 bytes result sent to driver
17/06/01 11:46:04 INFO executor.Executor: Finished task 1.0 in stage 91.0 (TID 188). 2252 bytes result sent to driver
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 91.0 (TID 187) in 48 ms on localhost (1/2)
17/06/01 11:46:04 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 91.0 (TID 188) in 49 ms on localhost (2/2)
17/06/01 11:46:04 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 91.0, whose tasks have all completed, from pool
17/06/01 11:46:04 INFO scheduler.DAGScheduler: ResultStage 91 (collect at <stdin>:1) finished in 0.051 s
17/06/01 11:46:04 INFO scheduler.DAGScheduler: Job 76 finished: collect at <stdin>:1, took 0.055614 s
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
>>> testsortbykey.reduceByKey(lambda x,y: x+y)
PythonRDD[212] at RDD at PythonRDD.scala:43
>>> testsortbykey.collect()
17/06/01 11:47:06 INFO spark.SparkContext: Starting job: collect at <stdin>:1
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Got job 77 (collect at <stdin>:1) with 2 output partitions
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 92 (collect at <stdin>:1)
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Missing parents: List()
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Submitting ResultStage 92 (PythonRDD[207] at collect at <stdin>:1), which has no missing parents
17/06/01 11:47:06 INFO storage.MemoryStore: Block broadcast_107 stored as values in memory (estimated size 5.3 KB, free 4.5 MB)
17/06/01 11:47:06 INFO storage.MemoryStore: Block broadcast_107_piece0 stored as bytes in memory (estimated size 3.3 KB, free 4.5 MB)
17/06/01 11:47:06 INFO storage.BlockManagerInfo: Added broadcast_107_piece0 in memory on localhost:57701 (size: 3.3 KB, free: 511.1 MB)
17/06/01 11:47:06 INFO spark.SparkContext: Created broadcast 107 from broadcast at DAGScheduler.scala:1006
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 92 (PythonRDD[207] at collect at <stdin>:1)
17/06/01 11:47:06 INFO scheduler.TaskSchedulerImpl: Adding task set 92.0 with 2 tasks
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 92.0 (TID 189, localhost, partition 0,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 92.0 (TID 190, localhost, partition 1,PROCESS_LOCAL, 2147 bytes)
17/06/01 11:47:06 INFO executor.Executor: Running task 0.0 in stage 92.0 (TID 189)
17/06/01 11:47:06 INFO executor.Executor: Running task 1.0 in stage 92.0 (TID 190)
17/06/01 11:47:06 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:55+55
17/06/01 11:47:06 INFO rdd.HadoopRDD: Input split: file:/opt/hadoop/spark-1.6.0/TESTSortbykey.md:0+55
17/06/01 11:47:06 INFO python.PythonRunner: Times: total = 3, boot = 2, init = 1, finish = 0
17/06/01 11:47:06 INFO executor.Executor: Finished task 1.0 in stage 92.0 (TID 190). 2252 bytes result sent to driver
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 92.0 (TID 190) in 13 ms on localhost (1/2)
17/06/01 11:47:06 INFO python.PythonRunner: Times: total = 11, boot = 3, init = 7, finish = 1
17/06/01 11:47:06 INFO executor.Executor: Finished task 0.0 in stage 92.0 (TID 189). 2451 bytes result sent to driver
17/06/01 11:47:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 92.0 (TID 189) in 16 ms on localhost (2/2)
17/06/01 11:47:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 92.0, whose tasks have all completed, from pool
17/06/01 11:47:06 INFO scheduler.DAGScheduler: ResultStage 92 (collect at <stdin>:1) finished in 0.017 s
17/06/01 11:47:06 INFO scheduler.DAGScheduler: Job 77 finished: collect at <stdin>:1, took 0.020758 s
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1), (u'whose', 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1), (u'snow', 1), (u'and', 1), (u'every', 1), (u'where', 1), (u'that', 1), (u'Mary', 1), (u'went', 1), (u'the', 1), (u'Lamb', 1), (u'was', 1), (u'sure', 1), (u'to', 1), (u'go', 1), (u'', 1)]
>>>
第一步正确:
>>> rdd = sc.textFile("./yourFile.md").flatMap(lambda x: x.split(" ")).map(lambda x: (x,1))
>>> rdd.collect()
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u"It's", 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow,', 1), (u'yeah', 1), (u'Everywhere', 1), (u'the', 1), (u'child', 1),
(u'went', 1), (u'The', 1), (u'lamb,', 1), (u'the', 1), (u'lamb', 1),
(u'was', 1), (u'sure', 1), (u'to', 1), (u'go,', 1), (u'yeah', 1)]
有什么问题吗?
如果你这样做:
>>> rdd.reduceByKey(lambda x,y: x+y)
然后是这个:
>>> rdd.collect()
[(u'Mary', 1), (u'had', 1), (u'a', 1), (u'little', 1), (u'lamb', 1),
(u"It's", 1), (u'fleece', 1), (u'was', 1), (u'white', 1), (u'as', 1),
(u'snow,', 1), (u'yeah', 1), (u'Everywhere', 1), (u'the', 1), (u'child', 1),
(u'went', 1), (u'The', 1), (u'lamb,', 1), (u'the', 1), (u'lamb', 1),
(u'was', 1), (u'sure', 1), (u'to', 1), (u'go,', 1), (u'yeah', 1)]
你只应用了一个转换,但你没有改变你的起始 rdd。
但是..
第一个选项(如果你只想看到转换):
>>> rdd.reduceByKey(lambda x,y: x+y).collect()
[(u'a', 1), (u'lamb', 2), (u'little', 1), (u'white', 1), (u'had', 1),
(u'fleece', 1), (u'The', 1), (u'snow,', 1), (u'Everywhere', 1), (u'went', 1), (u'was', 2),
(u'the', 2), (u'as', 1), (u'go,', 1), (u'sure', 1), (u'lamb,', 1),
(u"It's", 1), (u'yeah', 2), (u'to', 1), (u'child', 1), (u'Mary', 1)]
第二个选项(如果你想在新的 rdd 中保存你的转换):
如果你这样做:
>>> rddReduced = rdd.reduceByKey(lambda x,y: x+y)
然后是这个:
>>> rddReduced.collect()
[(u'a', 1), (u'lamb', 2), (u'little', 1), (u'white', 1), (u'had', 1),
(u'fleece', 1), (u'The', 1), (u'snow,', 1), (u'Everywhere', 1), (u'went', 1),
(u'was', 2), (u'the', 2), (u'as', 1), (u'go,', 1), (u'sure', 1), (u'lamb,', 1),
(u"It's", 1), (u'yeah', 2), (u'to', 1), (u'child', 1), (u'Mary', 1)]
您已经应用并保存了您的转换,结果就是您要找的。
同样的概念,如果你想应用 sortByKey()