ArrayIndexOutOfBoundsException 与 Spark、Spark-Avro 和 Google 分析数据

ArrayIndexOutOfBoundsException with Spark, Spark-Avro and Google Analytics Data

我正在尝试将 spark-avro 与来自我们一位客户的 Google Analytics avro 数据文件一起使用。另外,我是 spark/scala 的新手,所以如果我有任何错误或做了任何愚蠢的事情,我深表歉意。我正在使用 Spark 1.3.1。

我正在试验 spark-shell 中的数据,我是这样开始的:

spark-shell --packages com.databricks:spark-avro_2.10:1.0.0

然后我运行执行以下命令:

import com.databricks.spark.avro._
import scala.collection.mutable._

val gadata = sqlContext.avroFile("[client]/data")
gadata: org.apache.spark.sql.DataFrame = [visitorId: bigint, visitNumber: bigint, visitId: bigint, visitStartTime:  bigint, date: string, totals: struct<visits:bigint,hits:bigint,pageviews:bigint,timeOnSite:bigint,bounces:bigint,tr ansactions:bigint,transactionRevenue:bigint,newVisits:bigint,screenviews:bigint,uniqueScreenviews:bigint,timeOnScre en:bigint,totalTransactionRevenue:bigint>, trafficSource: struct<referralPath:string,campaign:string,source:string, medium:string,keyword:string,adContent:string>, device: struct<browser:string,browserVersion:string,operatingSystem :string,operatingSystemVersion:string,isMobile:boolean,mobileDeviceBranding:string,flashVersion:string,javaEnabled: boolean,language:string,screenColors:string,screenResolution:string,deviceCategory:string>, geoNetwork: str...

val gaIds = gadata.map(ga => ga.getString(11)).collect()

我收到以下错误:

[Stage 2:=>                                                                                          (8 + 4) / 430]15/05/14 11:14:04 ERROR Executor: Exception in task 12.0 in stage 2.0 (TID 27)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:14:04 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException

15/05/14 11:14:04 ERROR TaskSetManager: Task 12 in stage 2.0 failed 1 times; aborting job
15/05/14 11:14:04 WARN TaskSetManager: Lost task 11.0 in stage 2.0 (TID 26, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 10.0 in stage 2.0 (TID 25, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 9.0 in stage 2.0 (TID 24, localhost): TaskKilled (killed intentionally)
15/05/14 11:14:04 WARN TaskSetManager: Lost task 13.0 in stage 2.0 (TID 28, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 2.0 failed 1 times, most recent failure: Lost task 12.0 in stage 2.0 (TID 27, localhost): java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

虽然这可能与我使用的索引有关,但以下语句可以正常工作。

scala> gadata.first().getString(11)
res12: String = 29456309767885

所以我虽然可能有些记录可能是空的或有不同数量的列...所以我尝试运行以下语句来获取所有记录长度的列表:

scala> gadata.map(ga => ga.length).collect()

但是我得到了类似的错误:

[Stage 4:=>                                                                                          (8 + 4) / 430]15/05/14 11:20:04 ERROR Executor: Exception in task 12.0 in stage 4.0 (TID 42)
java.lang.ArrayIndexOutOfBoundsException
15/05/14 11:20:04 WARN TaskSetManager: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException

15/05/14 11:20:04 ERROR TaskSetManager: Task 12 in stage 4.0 failed 1 times; aborting job
15/05/14 11:20:04 WARN TaskSetManager: Lost task 11.0 in stage 4.0 (TID 41, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 ERROR Executor: Exception in task 13.0 in stage 4.0 (TID 43)
org.apache.spark.TaskKilledException
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 9.0 in stage 4.0 (TID 39, localhost): TaskKilled (killed intentionally)
15/05/14 11:20:04 WARN TaskSetManager: Lost task 10.0 in stage 4.0 (TID 40, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 4.0 failed 1 times, most recent failure: Lost task 12.0 in stage 4.0 (TID 42, localhost): java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1193)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

这是 Spark-Avro 或 Spark 的问题吗?

不确定潜在的问题是什么,但我已通过将我的数据分解为每月的数据集来修复错误。我在一个文件夹中有 4 个月的 GA 数据,并且正在对所有数据进行操作。每天的数据量在 70MB 到 150MB 之间。

为 1 月、2 月、3 月和 4 月创建 4 个文件夹并分别加载它们地图成功,没有任何问题。加载后,我可以将数据集连接在一起(到目前为止只尝试了两个)并处理它们,没有问题。

我在伪 Hadoop 发行版上使用 Spark,不确定这是否会对 Spark 可以处理的数据量产生影响。

更新:

找到错误的根本问题。我加载每个月的数据并打印出模式。一月和二月是相同的,但在此之后,一个字段在三月和四月模式中走动:

root
 |-- visitorId: long (nullable = true)
 |-- visitNumber: long (nullable = true)
 |-- visitId: long (nullable = true)
 |-- visitStartTime: long (nullable = true)
 |-- date: string (nullable = true)
 |-- totals: struct (nullable = true)
 |    |-- visits: long (nullable = true)
 |    |-- hits: long (nullable = true)
 |    |-- pageviews: long (nullable = true)
 |    |-- timeOnSite: long (nullable = true)
 |    |-- bounces: long (nullable = true)
 |    |-- transactions: long (nullable = true)
 |    |-- transactionRevenue: long (nullable = true)
 |    |-- newVisits: long (nullable = true)
 |    |-- screenviews: long (nullable = true)
 |    |-- uniqueScreenviews: long (nullable = true)
 |    |-- timeOnScreen: long (nullable = true)
 |    |-- totalTransactionRevenue: long (nullable = true)
(snipped)

2 月之后,底部的 totalTransactionRevenuse 不再存在。所以我认为这是导致错误的原因 related to this issue