Spark 程序 运行 无限
Spark program running infinitely
场景: 我正在尝试读取存储在 S3 中的 Avro 文件并使用 databricks Spark-Avro 库从中创建一个 DataFrame。这是我正在使用的代码:
package org.myorg.dataframe;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class S3DataFrame {
public static void main(String[] args) {
System.out.println("START...");
SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration config = sc.hadoopConfiguration();
//FOR s3a
config.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
config.set("fs.s3a.access.key","****************");
config.set("fs.s3a.secret.key","********************");
config.set("fs.s3a.endpoint", "s3-us-west-2.amazonaws.com");
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.load("s3a://bucket-name/employees.avro", "com.databricks.spark.avro");
df.show();
df.printSchema();
df.select("name").show();
System.out.println("DONE");
// df.save("/Users/miqbal1/avvvvvv.avro/", "com.databricks.spark.avro");
}
}
问题:程序好像是运行ning无限。它不会抛出任何异常,但会持续 运行ning 并跟踪以下内容:
.
.
.
.
15/05/18 17:35:44 INFO HttpServer: Starting HTTP Server
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started SocketConnector@0.0.0.0:60316
15/05/18 17:35:44 INFO Utils: Successfully started service 'HTTP file server' on port 60316.
15/05/18 17:35:44 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/05/18 17:35:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/18 17:35:44 INFO SparkUI: Started SparkUI at http://172.28.210.74:4040
15/05/18 17:35:44 INFO Executor: Starting executor ID <driver> on host localhost
15/05/18 17:35:44 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@172.28.210.74:60315/user/HeartbeatReceiver
15/05/18 17:35:44 INFO NettyBlockTransferService: Server created on 60317
15/05/18 17:35:44 INFO BlockManagerMaster: Trying to register BlockManager
15/05/18 17:35:44 INFO BlockManagerMasterActor: Registering block manager localhost:60317 with 66.9 MB RAM, BlockManagerId(<driver>, localhost, 60317)
15/05/18 17:35:44 INFO BlockManagerMaster: Registered BlockManager
15/05/18 17:35:45 WARN AmazonHttpClient: Detected a possible problem with the current JVM version (1.6.0_65). If you experience XML parsing problems using the SDK, try upgrading to a more recent JVM update.
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:48 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:48 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(230868) called with curMem=0, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 225.5 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(31491) called with curMem=230868, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60317 (size: 30.8 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:82
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:50 INFO FileInputFormat: Total input paths to process : 1
15/05/18 17:35:50 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:50 INFO DAGScheduler: Got job 0 (runJob at SparkPlan.scala:122) with 1 output partitions (allowLocal=false)
15/05/18 17:35:50 INFO DAGScheduler: Final stage: Stage 0(runJob at SparkPlan.scala:122)
15/05/18 17:35:50 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:50 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:50 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=262359, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=265807, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/05/18 17:35:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/18 17:35:50 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:0+1
15/05/18 17:35:50 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/18 17:35:50 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/18 17:35:50 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/18 17:35:50 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/18 17:35:50 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:51 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:51 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -597
15/05/18 17:35:53 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1800 bytes result sent to driver
15/05/18 17:35:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2782 ms on localhost (1/1)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/05/18 17:35:53 INFO DAGScheduler: Stage 0 (runJob at SparkPlan.scala:122) finished in 2.797 s
15/05/18 17:35:53 INFO DAGScheduler: Job 0 finished: runJob at SparkPlan.scala:122, took 2.974724 s
15/05/18 17:35:53 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:53 INFO DAGScheduler: Got job 1 (runJob at SparkPlan.scala:122) with 596 output partitions (allowLocal=false)
15/05/18 17:35:53 INFO DAGScheduler: Final stage: Stage 1(runJob at SparkPlan.scala:122)
15/05/18 17:35:53 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:53 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=268193, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=271641, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/18 17:35:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:53 INFO DAGScheduler: Submitting 596 missing tasks from Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 596 tasks
15/05/18 17:35:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/05/18 17:35:53 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:1+1
15/05/18 17:35:53 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:54 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:54 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:55 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -596
15/05/18 17:35:55 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 1
15/05/18 17:35:56 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1800 bytes result sent to driver
15/05/18 17:35:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:56 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
15/05/18 17:35:56 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2224 ms on localhost (1/596)
15/05/18 17:35:56 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:2+1
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO BlockManager: Removing broadcast 1
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1_piece0
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1_piece0 of size 2386 dropped from memory (free 69905618)
15/05/18 17:35:56 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:60317 in memory (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:56 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1 of size 3448 dropped from memory (free 69909066)
15/05/18 17:35:56 INFO ContextCleaner: Cleaned broadcast 1
15/05/18 17:35:56 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:57 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:57 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:58 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -595
15/05/18 17:35:58 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 2
15/05/18 17:35:58 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 1800 bytes result sent to driver
15/05/18 17:35:58 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:58 INFO Executor: Running task 2.0 in stage 1.0 (TID 3)
15/05/18 17:35:58 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 2655 ms on localhost (2/596)
15/05/18 17:35:58 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:3+1
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:58 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:59 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
.
.
.
.
代码的行为就像是 运行 在无限循环中运行(没有做任何有用的事情)。尝试谷歌搜索,但没有发现任何有用的东西。也无法通过 Spark 邮件列表获得任何帮助。如果我 运行 针对本地存储的文件,相同的代码 运行ning 完全没问题。我不确定我是否遗漏了什么,因为这是 API 告诉我的方式。你们谁能指出我正确的方向吗?
非常感谢您的帮助。非常感谢您的宝贵时间。
显然 s3a 不适用于此。这是我必须做的才能让它发挥作用:
SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration config = sc.hadoopConfiguration();
config.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
config.set("fs.s3n.awsAccessKeyId", "************");
config.set("fs.s3n.awsSecretAccessKey",
"*************");
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext
.load("s3n://input/avro_data/part-00000.avro",
"com.databricks.spark.avro");
场景: 我正在尝试读取存储在 S3 中的 Avro 文件并使用 databricks Spark-Avro 库从中创建一个 DataFrame。这是我正在使用的代码:
package org.myorg.dataframe;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class S3DataFrame {
public static void main(String[] args) {
System.out.println("START...");
SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration config = sc.hadoopConfiguration();
//FOR s3a
config.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
config.set("fs.s3a.access.key","****************");
config.set("fs.s3a.secret.key","********************");
config.set("fs.s3a.endpoint", "s3-us-west-2.amazonaws.com");
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.load("s3a://bucket-name/employees.avro", "com.databricks.spark.avro");
df.show();
df.printSchema();
df.select("name").show();
System.out.println("DONE");
// df.save("/Users/miqbal1/avvvvvv.avro/", "com.databricks.spark.avro");
}
}
问题:程序好像是运行ning无限。它不会抛出任何异常,但会持续 运行ning 并跟踪以下内容:
.
.
.
.
15/05/18 17:35:44 INFO HttpServer: Starting HTTP Server
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started SocketConnector@0.0.0.0:60316
15/05/18 17:35:44 INFO Utils: Successfully started service 'HTTP file server' on port 60316.
15/05/18 17:35:44 INFO SparkEnv: Registering OutputCommitCoordinator
15/05/18 17:35:44 INFO Server: jetty-8.y.z-SNAPSHOT
15/05/18 17:35:44 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/05/18 17:35:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/05/18 17:35:44 INFO SparkUI: Started SparkUI at http://172.28.210.74:4040
15/05/18 17:35:44 INFO Executor: Starting executor ID <driver> on host localhost
15/05/18 17:35:44 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@172.28.210.74:60315/user/HeartbeatReceiver
15/05/18 17:35:44 INFO NettyBlockTransferService: Server created on 60317
15/05/18 17:35:44 INFO BlockManagerMaster: Trying to register BlockManager
15/05/18 17:35:44 INFO BlockManagerMasterActor: Registering block manager localhost:60317 with 66.9 MB RAM, BlockManagerId(<driver>, localhost, 60317)
15/05/18 17:35:44 INFO BlockManagerMaster: Registered BlockManager
15/05/18 17:35:45 WARN AmazonHttpClient: Detected a possible problem with the current JVM version (1.6.0_65). If you experience XML parsing problems using the SDK, try upgrading to a more recent JVM update.
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:47 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:48 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:48 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:48 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(230868) called with curMem=0, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 225.5 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(31491) called with curMem=230868, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 30.8 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60317 (size: 30.8 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 0 from hadoopFile at AvroRelation.scala:82
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:50 INFO FileInputFormat: Total input paths to process : 1
15/05/18 17:35:50 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:50 INFO DAGScheduler: Got job 0 (runJob at SparkPlan.scala:122) with 1 output partitions (allowLocal=false)
15/05/18 17:35:50 INFO DAGScheduler: Final stage: Stage 0(runJob at SparkPlan.scala:122)
15/05/18 17:35:50 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:50 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:50 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=262359, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:50 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=265807, maxMem=70177259
15/05/18 17:35:50 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:50 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:50 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:50 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:50 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:50 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/05/18 17:35:50 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:50 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/05/18 17:35:50 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:0+1
15/05/18 17:35:50 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/05/18 17:35:50 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/05/18 17:35:50 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/05/18 17:35:50 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/05/18 17:35:50 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/05/18 17:35:50 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:51 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:51 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:51 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -597
15/05/18 17:35:53 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1800 bytes result sent to driver
15/05/18 17:35:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2782 ms on localhost (1/1)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/05/18 17:35:53 INFO DAGScheduler: Stage 0 (runJob at SparkPlan.scala:122) finished in 2.797 s
15/05/18 17:35:53 INFO DAGScheduler: Job 0 finished: runJob at SparkPlan.scala:122, took 2.974724 s
15/05/18 17:35:53 INFO SparkContext: Starting job: runJob at SparkPlan.scala:122
15/05/18 17:35:53 INFO DAGScheduler: Got job 1 (runJob at SparkPlan.scala:122) with 596 output partitions (allowLocal=false)
15/05/18 17:35:53 INFO DAGScheduler: Final stage: Stage 1(runJob at SparkPlan.scala:122)
15/05/18 17:35:53 INFO DAGScheduler: Parents of final stage: List()
15/05/18 17:35:53 INFO DAGScheduler: Missing parents: List()
15/05/18 17:35:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97), which has no missing parents
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(3448) called with curMem=268193, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.4 KB, free 66.7 MB)
15/05/18 17:35:53 INFO MemoryStore: ensureFreeSpace(2386) called with curMem=271641, maxMem=70177259
15/05/18 17:35:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 66.7 MB)
15/05/18 17:35:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60317 (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/05/18 17:35:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839
15/05/18 17:35:53 INFO DAGScheduler: Submitting 596 missing tasks from Stage 1 (MapPartitionsRDD[2] at map at SparkPlan.scala:97)
15/05/18 17:35:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 596 tasks
15/05/18 17:35:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/05/18 17:35:53 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:1+1
15/05/18 17:35:53 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:54 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:54 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:54 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:55 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -596
15/05/18 17:35:55 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 1
15/05/18 17:35:56 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1800 bytes result sent to driver
15/05/18 17:35:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:56 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
15/05/18 17:35:56 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2224 ms on localhost (1/596)
15/05/18 17:35:56 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:2+1
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO BlockManager: Removing broadcast 1
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1_piece0
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1_piece0 of size 2386 dropped from memory (free 69905618)
15/05/18 17:35:56 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:60317 in memory (size: 2.3 KB, free: 66.9 MB)
15/05/18 17:35:56 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/05/18 17:35:56 INFO BlockManager: Removing block broadcast_1
15/05/18 17:35:56 INFO MemoryStore: Block broadcast_1 of size 3448 dropped from memory (free 69909066)
15/05/18 17:35:56 INFO ContextCleaner: Cleaned broadcast 1
15/05/18 17:35:56 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:56 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:56 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:57 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -4
15/05/18 17:35:57 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
15/05/18 17:35:58 INFO S3AFileSystem: Reopening avro_data/episodes.avro to seek to new offset -595
15/05/18 17:35:58 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 2
15/05/18 17:35:58 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 1800 bytes result sent to driver
15/05/18 17:35:58 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1306 bytes)
15/05/18 17:35:58 INFO Executor: Running task 2.0 in stage 1.0 (TID 3)
15/05/18 17:35:58 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 2655 ms on localhost (2/596)
15/05/18 17:35:58 INFO HadoopRDD: Input split: s3a://bucket-name/avro_data/episodes.avro:3+1
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:58 INFO S3AFileSystem: Opening 's3a://bucket-name/avro_data/episodes.avro' for reading
15/05/18 17:35:58 INFO S3AFileSystem: Getting path status for s3a://bucket-name/avro_data/episodes.avro (avro_data/episodes.avro)
15/05/18 17:35:59 INFO S3AFileSystem: Actually opening file avro_data/episodes.avro at pos 0
.
.
.
.
代码的行为就像是 运行 在无限循环中运行(没有做任何有用的事情)。尝试谷歌搜索,但没有发现任何有用的东西。也无法通过 Spark 邮件列表获得任何帮助。如果我 运行 针对本地存储的文件,相同的代码 运行ning 完全没问题。我不确定我是否遗漏了什么,因为这是 API 告诉我的方式。你们谁能指出我正确的方向吗?
非常感谢您的帮助。非常感谢您的宝贵时间。
显然 s3a 不适用于此。这是我必须做的才能让它发挥作用:
SparkConf conf = new SparkConf().setAppName("S3DataFrame").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);
Configuration config = sc.hadoopConfiguration();
config.set("fs.s3n.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");
config.set("fs.s3n.awsAccessKeyId", "************");
config.set("fs.s3n.awsSecretAccessKey",
"*************");
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext
.load("s3n://input/avro_data/part-00000.avro",
"com.databricks.spark.avro");