Apache Spark 聚合函数 运行 在运行时进入 ArrayIndexOutOfBoundsException
Apache Spark Aggregate functions running into ArrayIndexOutOfBoundsException during runtime
在将我的 java spark 程序部署到集群期间,它遇到了 ArrayIndexOutOfBoundsException: 11
异常
根据我的理解,我的代码编写方式在语法上没有任何错误,索引错误并不能说明问题所在。我的程序只是应该能够采用 12 列,用空格分隔,然后它需要采用一列(命令列)并进行聚合以查看每个命令存在多少次,即
column1 column2 command column3 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#2 534 ect
dggd gdegdg cmd#5 5353 ect
dggd gdegdg cmd#2 533 ect
看起来像
commmand count
command#1 5
command#2 15
command#5 514
我是 运行
火花2.1
HDP 2.6
这是我目前的代码
public class Main {
public static void main(String[] args) {
//functions fu = new functions();
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("appName").setMaster("local[*]"));
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "date time command service responseCode bytes ip dash1 dash2 dash3 num dash4";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3],attributes[4],attributes[5],
attributes[6],attributes[7],attributes[8],attributes[9],
attributes[10],attributes[11]);
});
Dataset<Row> dataDF = spark.createDataFrame(rowRDD, schema);
dataDF.createOrReplaceTempView("data");
//shows the top 20 rows from the dataframe including all columns
Dataset<Row> showDF = spark.sql("select * from data");
//shows the top 20 columns from the same dataframe, but only displays
//the command column
Dataset<Row> commandDF = spark.sql("select command from data");
showDF.show();
commandDF.show();
此代码工作正常,但是当我尝试使用如下代码查找最终结果时,它遇到了索引错误。
logsDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from logs group by command").show();
最后是 spark-submit 代码
spark-submit --class com.ect.java.Main /path/application.jar hdfs:///path/textfile.txt
在我看来,这是一个环境问题,但找不到与此问题相关的任何文档
问题不在于聚合函数。问题出在您的日志文件上。您遇到错误
Caused by: java.lang.ArrayIndexOutOfBoundsException: 11
这意味着您的日志文件中的某一行有 11 个条目,而不是程序要求的 12 个条目。您可以通过创建示例 log.txt 文件并在该文件中保留两行来验证这一点。
您的代码分组应该如下所示(看起来像打字错误)。在您的示例应用程序中,您有 dataDF
而不是 logsDF
。临时 table 名称是 data
而不是 logs
。
dataDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from data group by command");
ans.show();
在将我的 java spark 程序部署到集群期间,它遇到了 ArrayIndexOutOfBoundsException: 11
异常
根据我的理解,我的代码编写方式在语法上没有任何错误,索引错误并不能说明问题所在。我的程序只是应该能够采用 12 列,用空格分隔,然后它需要采用一列(命令列)并进行聚合以查看每个命令存在多少次,即
column1 column2 command column3 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#1 533 ect
dggd gdegdg cmd#2 534 ect
dggd gdegdg cmd#5 5353 ect
dggd gdegdg cmd#2 533 ect
看起来像
commmand count
command#1 5
command#2 15
command#5 514
我是 运行 火花2.1 HDP 2.6 这是我目前的代码
public class Main {
public static void main(String[] args) {
//functions fu = new functions();
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("appName").setMaster("local[*]"));
SparkSession spark = SparkSession
.builder()
.appName("Log File Reader")
.getOrCreate();
JavaRDD<String> logsRDD = spark.sparkContext()
.textFile(args[0],1)
.toJavaRDD();
String schemaString = "date time command service responseCode bytes ip dash1 dash2 dash3 num dash4";
List<StructField> fields = new ArrayList<>();
String[] fieldName = schemaString.split(" ");
for (String field : fieldName){
fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(" ");
return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3],attributes[4],attributes[5],
attributes[6],attributes[7],attributes[8],attributes[9],
attributes[10],attributes[11]);
});
Dataset<Row> dataDF = spark.createDataFrame(rowRDD, schema);
dataDF.createOrReplaceTempView("data");
//shows the top 20 rows from the dataframe including all columns
Dataset<Row> showDF = spark.sql("select * from data");
//shows the top 20 columns from the same dataframe, but only displays
//the command column
Dataset<Row> commandDF = spark.sql("select command from data");
showDF.show();
commandDF.show();
此代码工作正常,但是当我尝试使用如下代码查找最终结果时,它遇到了索引错误。
logsDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from logs group by command").show();
最后是 spark-submit 代码
spark-submit --class com.ect.java.Main /path/application.jar hdfs:///path/textfile.txt
在我看来,这是一个环境问题,但找不到与此问题相关的任何文档
问题不在于聚合函数。问题出在您的日志文件上。您遇到错误
Caused by: java.lang.ArrayIndexOutOfBoundsException: 11
这意味着您的日志文件中的某一行有 11 个条目,而不是程序要求的 12 个条目。您可以通过创建示例 log.txt 文件并在该文件中保留两行来验证这一点。
您的代码分组应该如下所示(看起来像打字错误)。在您的示例应用程序中,您有 dataDF
而不是 logsDF
。临时 table 名称是 data
而不是 logs
。
dataDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from data group by command");
ans.show();