在 Scala 中,当插入到仅强制执行某些特征的通用函数中时,如何强制编码器对类型进行操作?
In scala how do you enforce the encoder to operate on the type when gets inserted into a generic function that only enforces certain traits?
我有一个名为 createTimeLineDS 的函数,它将另一个函数作为输入并将该函数放置在内部数据集映射方法中。 createTimeLineDS 仅在输入函数类型签名上强制执行特征,而 Map 要求函数 returns 某种特征编码器。
出于某种原因,当我将 returns 一个案例 class 的函数放入此函数时,它会抛出错误:
Unable to find encoder for type TIMELINE. An implicit Encoder[TIMELINE] is needed to store TIMELINE instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] .map({ case ((event, team), user) =>
convertEventToTimeLineFunction(event, team, user)})
代码如下,我已经定义了所有特征和大小写 classes。有问题的是最后一个函数,调用该函数会产生上面的错误。我已经导入 sparkSession.implicits._,所以我不确定如何正确执行此操作。
特征、案例 classes 和用作参数的函数:
trait Event {
val teamId: String
val actorId: String
}
trait TimeLine {
val teamDomain: Option[String]
val teamName: Option[String]
val teamIsTest: Option[Boolean]
val actorEmail: Option[String]
val actorName: Option[String]
}
case class JobEventTimeline(
jobId: String,
jobType: Option[String],
inPlanning: Option[Boolean],
teamId: String,
actorId: String,
adminActorId: Option[String],
sessionId: String,
clientSessionId: Option[String],
clientCreatedAt: Long,
seqId: Long,
isSideEffect: Option[Boolean],
opAction: String,
stepId: Option[String],
jobBaseStepId: Option[String],
fieldId: Option[String],
serverReceivedAt: Option[Long],
// "Enriched" data. Data is pulled in from other sources during stream processing
teamDomain: Option[String] = None,
teamName: Option[String] = None,
teamIsTest: Option[Boolean] = None,
actorEmail: Option[String] = None,
actorName: Option[String] = None
) extends TimeLine
def createJobEventTimeLine(jobEvent: CaseClassJobEvent, team: Team, user: User): JobEventTimeline = {
JobEventTimeline(
jobEvent.jobId,
jobEvent.jobType,
jobEvent.inPlanning,
jobEvent.teamId,
jobEvent.actorId,
jobEvent.adminActorId,
jobEvent.sessionId,
jobEvent.clientSessionId,
jobEvent.clientCreatedAt,
jobEvent.seqId,
jobEvent.isSideEffect,
jobEvent.opAction,
jobEvent.stepId,
jobEvent.jobBaseStepId,
jobEvent.fieldId,
jobEvent.serverReceivedAt,
Some(team.domain),
Some(team.name),
Some(team.is_test),
Some(user.email),
Some(user.name)
)
}
问题函数及函数调用:
def createTimeLineDS[EVENT <: Event with Serializable, TIMELINE <: TimeLine]
(convertEventToTimeLineFunction: (EVENT, Team, User) => TIMELINE)
(sparkSession: SparkSession)
(jobEventDS: Dataset[EVENT]): Dataset[TIMELINE] = {
import sparkSession.implicits._
val teamDS = FuncUtils.createDSFromPostgresql[Team](sparkSession)
val userDS = FuncUtils.createDSFromPostgresql[User](sparkSession)
jobEventDS
.joinWith(teamDS, jobEventDS("teamId") === teamDS("id"), "left_outer")
.joinWith(userDS, $"_1.actorId" === userDS("id"), "left_outer")
.map({ case ((event, team), user) => convertEventToTimeLineFunction(event, team, user)})
函数调用:
val jobEventTimeLine = FuncUtils.createTimeLineDS(JobEventTimeline.createJobEventTimeLine)(sparkSession)(jobEventDS)
最简单的解决方案是改为执行此操作:
def createTimeLineDS[EVENT <: Event, TIMELINE <: TimeLine : Encoder](...)
您可能不需要 sparkSession
参数,也不需要 import sparkSession.implicits._
行。
(但您可能需要更多更改,请继续阅读)。
所以,问题是 map
method on a Dataset needs an implicit Encoder
for the output type. Thus, what you are doing with that funny syntax (called context bound) 是说你的方法也需要这样的隐式,因此只要你的方法的调用者提供它,编译器就会很高兴(通常在之前某处经过 import spark.implicits._
).
有关 implicits, where does the compiler search for them & 的更多信息,请阅读链接的文章。
现在,在您阅读了所有这些内容之后,我希望知道问题是什么以及如何解决它。
但可能,您仍然需要在您的方法中显式 import sparkSession.implicits._
。这可能是因为 FuncUtils.createDSFromPostgresql[Team](sparkSession)
做了同样的事情,但你现在知道如何重构它了。
此外,由于 Team
和 User
是您控制的具体 类,您可以将类似的内容添加到它们的伴随对象中,因此您不需要请求它们编码器,因为它们总是在隐式范围内。
object Team {
// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoders$@product[T%3C:Product](implicitevidence:reflect.runtime.universe.TypeTag[T]):org.apache.spark.sql.Encoder[T]
implicit final val TeamEncoder: Encoder[Team] = Encoders.product
}
我有一个名为 createTimeLineDS 的函数,它将另一个函数作为输入并将该函数放置在内部数据集映射方法中。 createTimeLineDS 仅在输入函数类型签名上强制执行特征,而 Map 要求函数 returns 某种特征编码器。
出于某种原因,当我将 returns 一个案例 class 的函数放入此函数时,它会抛出错误:
Unable to find encoder for type TIMELINE. An implicit Encoder[TIMELINE] is needed to store TIMELINE instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
[error] .map({ case ((event, team), user) =>
convertEventToTimeLineFunction(event, team, user)})
代码如下,我已经定义了所有特征和大小写 classes。有问题的是最后一个函数,调用该函数会产生上面的错误。我已经导入 sparkSession.implicits._,所以我不确定如何正确执行此操作。
特征、案例 classes 和用作参数的函数:
trait Event {
val teamId: String
val actorId: String
}
trait TimeLine {
val teamDomain: Option[String]
val teamName: Option[String]
val teamIsTest: Option[Boolean]
val actorEmail: Option[String]
val actorName: Option[String]
}
case class JobEventTimeline(
jobId: String,
jobType: Option[String],
inPlanning: Option[Boolean],
teamId: String,
actorId: String,
adminActorId: Option[String],
sessionId: String,
clientSessionId: Option[String],
clientCreatedAt: Long,
seqId: Long,
isSideEffect: Option[Boolean],
opAction: String,
stepId: Option[String],
jobBaseStepId: Option[String],
fieldId: Option[String],
serverReceivedAt: Option[Long],
// "Enriched" data. Data is pulled in from other sources during stream processing
teamDomain: Option[String] = None,
teamName: Option[String] = None,
teamIsTest: Option[Boolean] = None,
actorEmail: Option[String] = None,
actorName: Option[String] = None
) extends TimeLine
def createJobEventTimeLine(jobEvent: CaseClassJobEvent, team: Team, user: User): JobEventTimeline = {
JobEventTimeline(
jobEvent.jobId,
jobEvent.jobType,
jobEvent.inPlanning,
jobEvent.teamId,
jobEvent.actorId,
jobEvent.adminActorId,
jobEvent.sessionId,
jobEvent.clientSessionId,
jobEvent.clientCreatedAt,
jobEvent.seqId,
jobEvent.isSideEffect,
jobEvent.opAction,
jobEvent.stepId,
jobEvent.jobBaseStepId,
jobEvent.fieldId,
jobEvent.serverReceivedAt,
Some(team.domain),
Some(team.name),
Some(team.is_test),
Some(user.email),
Some(user.name)
)
}
问题函数及函数调用:
def createTimeLineDS[EVENT <: Event with Serializable, TIMELINE <: TimeLine]
(convertEventToTimeLineFunction: (EVENT, Team, User) => TIMELINE)
(sparkSession: SparkSession)
(jobEventDS: Dataset[EVENT]): Dataset[TIMELINE] = {
import sparkSession.implicits._
val teamDS = FuncUtils.createDSFromPostgresql[Team](sparkSession)
val userDS = FuncUtils.createDSFromPostgresql[User](sparkSession)
jobEventDS
.joinWith(teamDS, jobEventDS("teamId") === teamDS("id"), "left_outer")
.joinWith(userDS, $"_1.actorId" === userDS("id"), "left_outer")
.map({ case ((event, team), user) => convertEventToTimeLineFunction(event, team, user)})
函数调用:
val jobEventTimeLine = FuncUtils.createTimeLineDS(JobEventTimeline.createJobEventTimeLine)(sparkSession)(jobEventDS)
最简单的解决方案是改为执行此操作:
def createTimeLineDS[EVENT <: Event, TIMELINE <: TimeLine : Encoder](...)
您可能不需要 sparkSession
参数,也不需要 import sparkSession.implicits._
行。
(但您可能需要更多更改,请继续阅读)。
所以,问题是 map
method on a Dataset needs an implicit Encoder
for the output type. Thus, what you are doing with that funny syntax (called context bound) 是说你的方法也需要这样的隐式,因此只要你的方法的调用者提供它,编译器就会很高兴(通常在之前某处经过 import spark.implicits._
).
有关 implicits, where does the compiler search for them &
现在,在您阅读了所有这些内容之后,我希望知道问题是什么以及如何解决它。
但可能,您仍然需要在您的方法中显式 import sparkSession.implicits._
。这可能是因为 FuncUtils.createDSFromPostgresql[Team](sparkSession)
做了同样的事情,但你现在知道如何重构它了。
此外,由于 Team
和 User
是您控制的具体 类,您可以将类似的内容添加到它们的伴随对象中,因此您不需要请求它们编码器,因为它们总是在隐式范围内。
object Team {
// https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoders$@product[T%3C:Product](implicitevidence:reflect.runtime.universe.TypeTag[T]):org.apache.spark.sql.Encoder[T]
implicit final val TeamEncoder: Encoder[Team] = Encoders.product
}