在 Scala 中,当插入到仅强制执行某些特征的通用函数中时,如何强制编码器对类型进行操作?

In scala how do you enforce the encoder to operate on the type when gets inserted into a generic function that only enforces certain traits?

我有一个名为 createTimeLineDS 的函数,它将另一个函数作为输入并将该函数放置在内部数据集映射方法中。 createTimeLineDS 仅在输入函数类型签名上强制执行特征,而 Map 要求函数 returns 某种特征编码器。

出于某种原因,当我将 returns 一个案例 class 的函数放入此函数时,它会抛出错误:

    Unable to find encoder for type TIMELINE. An implicit Encoder[TIMELINE] is needed to store TIMELINE instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    [error]       .map({ case ((event, team), user) =>  

convertEventToTimeLineFunction(event, team, user)})

代码如下,我已经定义了所有特征和大小写 classes。有问题的是最后一个函数,调用该函数会产生上面的错误。我已经导入 sparkSession.implicits._,所以我不确定如何正确执行此操作。

特征、案例 classes 和用作参数的函数:

trait Event {
  val teamId: String
  val actorId: String
}

trait TimeLine {
  val teamDomain: Option[String]
  val teamName: Option[String]
  val teamIsTest: Option[Boolean]
  val actorEmail: Option[String]
  val actorName: Option[String]
}  

case class JobEventTimeline(
                         jobId: String,
                         jobType: Option[String],
                         inPlanning: Option[Boolean],

                         teamId: String,
                         actorId: String,
                         adminActorId: Option[String],
                         sessionId: String,
                         clientSessionId: Option[String],
                         clientCreatedAt: Long,
                         seqId: Long,
                         isSideEffect: Option[Boolean],

                         opAction: String,
                         stepId: Option[String],
                         jobBaseStepId: Option[String],
                         fieldId: Option[String],

                         serverReceivedAt: Option[Long],

                         // "Enriched" data. Data is pulled in from other sources during stream processing
                         teamDomain: Option[String] = None,
                         teamName: Option[String] = None,
                         teamIsTest: Option[Boolean] = None,

                         actorEmail: Option[String] = None,
                         actorName: Option[String] = None
                       ) extends TimeLine


def createJobEventTimeLine(jobEvent: CaseClassJobEvent, team: Team, user: User): JobEventTimeline = {
    JobEventTimeline(
      jobEvent.jobId,
      jobEvent.jobType,
      jobEvent.inPlanning,
      jobEvent.teamId,
      jobEvent.actorId,
      jobEvent.adminActorId,
      jobEvent.sessionId,
      jobEvent.clientSessionId,
      jobEvent.clientCreatedAt,
      jobEvent.seqId,
      jobEvent.isSideEffect,
      jobEvent.opAction,
      jobEvent.stepId,
      jobEvent.jobBaseStepId,
      jobEvent.fieldId,
      jobEvent.serverReceivedAt,
      Some(team.domain),
      Some(team.name),
      Some(team.is_test),
      Some(user.email),
      Some(user.name)
    )
  }

问题函数及函数调用:

def createTimeLineDS[EVENT <: Event with Serializable, TIMELINE <: TimeLine]

  (convertEventToTimeLineFunction: (EVENT, Team, User) => TIMELINE)
  (sparkSession: SparkSession)
  (jobEventDS: Dataset[EVENT]): Dataset[TIMELINE] = {
    import sparkSession.implicits._
    val teamDS = FuncUtils.createDSFromPostgresql[Team](sparkSession)
    val userDS = FuncUtils.createDSFromPostgresql[User](sparkSession)
    jobEventDS
      .joinWith(teamDS, jobEventDS("teamId") === teamDS("id"), "left_outer")
      .joinWith(userDS, $"_1.actorId" === userDS("id"), "left_outer")
      .map({ case ((event, team), user) =>  convertEventToTimeLineFunction(event, team, user)})

函数调用:

val jobEventTimeLine = FuncUtils.createTimeLineDS(JobEventTimeline.createJobEventTimeLine)(sparkSession)(jobEventDS)

最简单的解决方案是改为执行此操作:

def createTimeLineDS[EVENT <: Event, TIMELINE <: TimeLine : Encoder](...)

您可能不需要 sparkSession 参数,也不需要 import sparkSession.implicits._ 行。
(但您可能需要更多更改,请继续阅读)

所以,问题是 map method on a Dataset needs an implicit Encoder for the output type. Thus, what you are doing with that funny syntax (called context bound) 是说你的方法也需要这样的隐式,因此只要你的方法的调用者提供它,编译器就会很高兴(通常在之前某处经过 import spark.implicits._.

有关 implicits, where does the compiler search for them & 的更多信息,请阅读链接的文章。


现在,在您阅读了所有这些内容之后,我希望知道问题是什么以及如何解决它。
但可能,您仍然需要在您的方法中显式 import sparkSession.implicits._ 。这可能是因为 FuncUtils.createDSFromPostgresql[Team](sparkSession) 做了同样的事情,但你现在知道如何重构它了。

此外,由于 TeamUser 是您控制的具体 类,您可以将类似的内容添加到它们的伴随对象中,因此您不需要请求它们编码器,因为它们总是在隐式范围内。

object Team {
  // https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoders$@product[T%3C:Product](implicitevidence:reflect.runtime.universe.TypeTag[T]):org.apache.spark.sql.Encoder[T]
  implicit final val TeamEncoder: Encoder[Team] = Encoders.product
}