Apache Flink:转换广播变量失败,但我无法确定原因

Apache Flink: transforming Broadcast variables fails, but I can't determine why

我正在尝试在 Apache Flink 上准备一个小示例应用程序,主要目的是演示如何使用广播变量。此应用程序读取 CSV 文件并准备 DataSet[BuildingInformation]

case class BuildingInformation(
     buildingID: Int, buildingManager: String, buildingAge: Int,
     productID: String, country: String
)

这就是我正在创建 BuildingInformation 数据集的方法,目前:

val buildingsBroadcastSet =
      envDefault
      .fromElements(
                     readBuildingInfo(
                        envDefault,
                        "./SensorFiles/building.csv")
                   )

然后,我开始这样转变:

val hvacStream = readHVACReadings(envDefault,"./SensorFiles/HVAC.csv")

    hvacStream
      .map(new HVACToBuildingMapper)
      .withBroadcastSet(buildingsBroadcastSet,"buildingData")
      .writeAsCsv("./hvacTemp.csv")

(buildingID -> BuildingInformation) 的地图是我想要作为广播的参考数据。为了做好准备,我实现了一个 RichMapFunction:

 class HVACToBuildingMapper
    extends RichMapFunction  [HVACData,EnhancedHVACTempReading] {

    var allBuildingDetails: Map[Int, BuildingInformation] = _

    override def open(configuration: Configuration): Unit = {

      allBuildingDetails =
        getRuntimeContext
        .getBroadcastVariableWithInitializer(
          "buildingData",
          new BroadcastVariableInitializer [BuildingInformation,Map[Int,BuildingInformation]] {

            def initializeBroadcastVariable(valuesPushed:java.lang.Iterable[BuildingInformation]): Map[Int,BuildingInformation] = {
              valuesPushed
                .asScala
                .toList
              .map(nextBuilding => (nextBuilding.buildingID,nextBuilding))(breakOut)
            }
          }
        )
    }
    override def map(nextReading: HVACData): EnhancedHVACTempReading = {
      val buildingDetails = allBuildingDetails.getOrElse(nextReading.buildingID,UndefinedBuildingInformation)
      // ... more intermediate data creation logic here
      EnhancedHVACTempReading(
        nextReading.buildingID,
        rangeOfTempRecorded,
        isExtremeTempRecorded,
        buildingDetails.country,
        buildingDetails.productID,
        buildingDetails.buildingAge,
        buildingDetails.buildingManager
      )
    }

  }

在函数签名中

def initializeBroadcastVariable(valuesPushed:java.lang.Iterable[BuildingInformation]): Map[Int,BuildingInformation]

java.lang.Iterable 的资格是我的补充。没有这个,编译器会在 Intellij 中抱怨。

在运行时,应用程序在我从 Iterable[BuildingInformation] 中创建映射的位置失败,该映射由框架传递给 open() 函数:

java.lang.Exception: The user defined 'open()' method caused an exception: scala.collection.immutable.$colon$colon cannot be cast to org.nirmalya.hortonworks.tutorial.BuildingInformation
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:475)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.immutable.$colon$colon cannot be cast to org.nirmalya.hortonworks.tutorial.BuildingInformation
    at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon$$anonfun$initializeBroadcastVariable.apply(HVACReadingsAnalysis.scala:139)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon.initializeBroadcastVariable(HVACReadingsAnalysis.scala:139)
    at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper$$anon.initializeBroadcastVariable(HVACReadingsAnalysis.scala:133)
    at org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization.getVariable(BroadcastVariableMaterialization.java:234)
    at org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext.getBroadcastVariableWithInitializer(DistributedRuntimeUDFContext.java:84)
    at org.nirmalya.hortonworks.tutorial.HVACReadingsAnalysis$HVACToBuildingMapper.open(HVACReadingsAnalysis.scala:131)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:471)
    ... 3 more
09:28:54,389 INFO  org.apache.flink.runtime.client.JobClientActor                - 04/29/2016 09:28:54  Job execution switched to status FAILED.

通过假设它可能是从 (Java) Iterable 转换 case class 失败的特定案例(尽管我自己并不相信),我尝试用一​​个替换 BuildingInformation其所有成员字段的 Tuple5。行为没有改变。

我本可以通过提供 CanBuildFrom 来尝试,但我没有这样做。我的想法拒绝了一个简单的案例 class 不能映射到另一个数据结构。有什么地方不对,我看不出来。

为了完成 post,我尝试了对应于 Scala 2 的 Flink 版本。11.x 和 Scala 2。10.x:行为是相同的。

此外,这里是 EnhancedHVACTempReading(为了更好地理解代码):

case class EnhancedHVACTempReading(buildingID: Int, rangeOfTemp: String, extremeIndicator: Boolean,country: String, productID: String,buildingAge: Int, buildingManager: String)

我有一种预感,JVM 的混乱与 Java 的 Iterable 被用作 Scala 的列表有关,但是,我当然不确定。

谁能帮我找出错误?

问题是您必须在 readBuildingInfomap 函数中 return 一些东西。此外,如果您提供 List[BuildingInformation],则不应使用 fromElements,而如果您想展平列表,则应使用 fromCollection。以下代码片段显示了必要的更改。

def main(args: Array[String]): Unit = {
    val envDefault = ExecutionEnvironment.getExecutionEnvironment

    val buildingsBroadcastSet = readBuildingInfo(envDefault,"./SensorFiles/building.csv")

    val hvacStream = readHVACReadings(envDefault,"./SensorFiles/HVAC.csv")

    hvacStream
      .map(new HVACToBuildingMapper)
      .withBroadcastSet(buildingsBroadcastSet,"buildingData")
      .writeAsCsv("./hvacTemp.csv")

    envDefault.execute("HVAC Simulation")
}

private def readBuildingInfo(env: ExecutionEnvironment, inputPath: String): DataSet[BuildingInformation] = {
    val input = Source.fromFile(inputPath).getLines.drop(1).map(datum => {

      val fields = datum.split(",")
      BuildingInformation(
          fields(0).toInt,     // buildingID
          fields(1),           // buildingManager
          fields(2).toInt,     // buildingAge
          fields(3),           // productID
          fields(4)            // Country
        )
    })
    env.fromCollection(input.toList)
}