DataSet/DataStream 类型 class 接口
DataSet/DataStream of type class interface
我只是在尝试在 Flink 中使用 Scala type classes。我定义了以下类型 class 接口:
trait LikeEvent[T] {
def timestamp(payload: T): Int
}
现在,我想像这样考虑 DataSet
的 LikeEvent[_]
:
// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)
// create instances for the raw events
object EventInstance {
implicit val logEvent = new LikeEvent[Log] {
def timestamp(log: Log): Int = log.ts
}
implicit val metricEvent = new LikeEvent[Metric] {
def timestamp(metric: Metric): Int = metric.ts
}
}
// add ops to the raw event classes (regular class)
object EventSyntax {
implicit class Event[T: LikeEvent](val payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
}
以下应用程序运行良好:
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
Metric(1586736000, "cpu_usage", 0.2),
Log(1586736005, 1, "invalid login"),
Log(1586736010, 1, "invalid login"),
Log(1586736015, 1, "invalid login"),
Log(1586736030, 2, "valid login"),
Metric(1586736060, "cpu_usage", 0.8),
Log(1586736120, 0, "end of world"),
)
// count events per hour
val eventsPerHour = events
.map(new GetMinuteEventTuple())
.groupBy(0).reduceGroup { g =>
val gl = g.toList
val (hour, count) = (gl.head._1, gl.size)
(hour, count)
}
eventsPerHour.print()
正在打印预期输出
(0,5)
(1,1)
(2,1)
但是,如果我像这样修改语法对象:
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {
case class Event[T: LikeEvent](payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)
}
我收到以下错误:
type mismatch;
found : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
因此,在消息的指导下,我进行了以下更改:
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
之后,错误变为:
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
我不明白为什么 EventSyntax2
会导致这些错误,而 EventSyntax
编译并运行良好。为什么在 EventSyntax2
中使用大小写 class 包装器比在 EventSyntax
中使用常规 class 更成问题?
无论如何,我的问题是双重的:
- 如何解决
EventSyntax2
的问题?
- 实现我的目标最简单的方法是什么?在这里,我只是为了学习而试验类型 class 模式,但最终更面向对象的方法(基于子类型)对我来说看起来更简单。像这样:
// Define trait
trait Event {
def timestamp: Int
def payload: Product with Serializable // Any case class
}
// Metric adapter (similar for Log)
object MetricAdapter {
implicit class MetricEvent(val payload: Metric) extends Event {
def timestamp: Int = payload.ts
}
}
然后在主体中简单地使用val events: DataSet[Event] = env.fromElements(...)
。
注意 提出了类似的问题,但它考虑了一个简单的 Scala List
而不是 Flink DataSet
(或 DataStream
) .我的问题的重点是在 Flink 中使用类型 class 模式以某种方式考虑 异构 streams/datasets,以及它是否真的有意义或者应该明确支持在这种情况下是一个常规特征,并如上所述继承它。
顺便说一句,您可以在这里找到代码:https://github.com/salvalcantara/flink-events-and-polymorphism.
简答:Flink 无法在 Scala 中派生通配符类型TypeInformation
长答案:
你这两个问题其实都在问,什么是TypeInformation
,怎么用,怎么推导出来的。
TypeInformation
是 Flink 的内部类型系统,当数据通过网络洗牌并存储在状态后端时(使用 DataStream api 时),它用于序列化数据。
序列化是数据处理中的一个主要性能问题,因此 Flink 包含针对常见数据类型和模式的专用序列化程序。开箱即用,在其 Java 堆栈中,它支持所有 JVM 基元、Pojo、Flink 元组、一些常见的集合类型和 avro。 class 的类型是使用反射确定的,如果它与已知类型不匹配,它将回退到 Kryo。
在 scala api 中,类型信息是使用隐式派生的。 scala DataSet 和 DataStream api 上的所有方法都将其通用参数注释为隐式类型 class。
def map[T: TypeInformation]
这个 TypeInformation
可以像任何类型 class 一样手动提供,或者使用从 flink 导入的宏派生。
import org.apache.flink.api.scala._
这个宏修饰了 java 类型堆栈,支持 scala 元组、scala case classes 和一些常见的 scala std 库类型。我说装饰器是因为如果您的 class 不是其中一种类型,它可以并且将会退回到 java 堆栈。
那么为什么版本 1 有效?
因为它是一个普通的 class 类型堆栈无法匹配,所以它将它解析为一个通用类型并返回一个基于 kryo 的序列化程序。您可以从控制台对此进行测试,并看到它 returns 是一个通用类型。
> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>
版本 2 不起作用,因为它将类型识别为一个案例 class,然后为它的每个成员递归派生 TypeInformation
个实例。这对于不同于 Any
的通配符类型是不可能的,因此推导失败。
一般来说,您不应将 Flink 与异构类型一起使用,因为它无法为您的工作负载派生出高效的序列化程序。
我只是在尝试在 Flink 中使用 Scala type classes。我定义了以下类型 class 接口:
trait LikeEvent[T] {
def timestamp(payload: T): Int
}
现在,我想像这样考虑 DataSet
的 LikeEvent[_]
:
// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)
// create instances for the raw events
object EventInstance {
implicit val logEvent = new LikeEvent[Log] {
def timestamp(log: Log): Int = log.ts
}
implicit val metricEvent = new LikeEvent[Metric] {
def timestamp(metric: Metric): Int = metric.ts
}
}
// add ops to the raw event classes (regular class)
object EventSyntax {
implicit class Event[T: LikeEvent](val payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
}
以下应用程序运行良好:
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
Metric(1586736000, "cpu_usage", 0.2),
Log(1586736005, 1, "invalid login"),
Log(1586736010, 1, "invalid login"),
Log(1586736015, 1, "invalid login"),
Log(1586736030, 2, "valid login"),
Metric(1586736060, "cpu_usage", 0.8),
Log(1586736120, 0, "end of world"),
)
// count events per hour
val eventsPerHour = events
.map(new GetMinuteEventTuple())
.groupBy(0).reduceGroup { g =>
val gl = g.toList
val (hour, count) = (gl.head._1, gl.size)
(hour, count)
}
eventsPerHour.print()
正在打印预期输出
(0,5)
(1,1)
(2,1)
但是,如果我像这样修改语法对象:
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {
case class Event[T: LikeEvent](payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)
}
我收到以下错误:
type mismatch;
found : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
因此,在消息的指导下,我进行了以下更改:
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
之后,错误变为:
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
我不明白为什么 EventSyntax2
会导致这些错误,而 EventSyntax
编译并运行良好。为什么在 EventSyntax2
中使用大小写 class 包装器比在 EventSyntax
中使用常规 class 更成问题?
无论如何,我的问题是双重的:
- 如何解决
EventSyntax2
的问题? - 实现我的目标最简单的方法是什么?在这里,我只是为了学习而试验类型 class 模式,但最终更面向对象的方法(基于子类型)对我来说看起来更简单。像这样:
// Define trait
trait Event {
def timestamp: Int
def payload: Product with Serializable // Any case class
}
// Metric adapter (similar for Log)
object MetricAdapter {
implicit class MetricEvent(val payload: Metric) extends Event {
def timestamp: Int = payload.ts
}
}
然后在主体中简单地使用val events: DataSet[Event] = env.fromElements(...)
。
注意 List
而不是 Flink DataSet
(或 DataStream
) .我的问题的重点是在 Flink 中使用类型 class 模式以某种方式考虑 异构 streams/datasets,以及它是否真的有意义或者应该明确支持在这种情况下是一个常规特征,并如上所述继承它。
顺便说一句,您可以在这里找到代码:https://github.com/salvalcantara/flink-events-and-polymorphism.
简答:Flink 无法在 Scala 中派生通配符类型TypeInformation
长答案:
你这两个问题其实都在问,什么是TypeInformation
,怎么用,怎么推导出来的。
TypeInformation
是 Flink 的内部类型系统,当数据通过网络洗牌并存储在状态后端时(使用 DataStream api 时),它用于序列化数据。
序列化是数据处理中的一个主要性能问题,因此 Flink 包含针对常见数据类型和模式的专用序列化程序。开箱即用,在其 Java 堆栈中,它支持所有 JVM 基元、Pojo、Flink 元组、一些常见的集合类型和 avro。 class 的类型是使用反射确定的,如果它与已知类型不匹配,它将回退到 Kryo。
在 scala api 中,类型信息是使用隐式派生的。 scala DataSet 和 DataStream api 上的所有方法都将其通用参数注释为隐式类型 class。
def map[T: TypeInformation]
这个 TypeInformation
可以像任何类型 class 一样手动提供,或者使用从 flink 导入的宏派生。
import org.apache.flink.api.scala._
这个宏修饰了 java 类型堆栈,支持 scala 元组、scala case classes 和一些常见的 scala std 库类型。我说装饰器是因为如果您的 class 不是其中一种类型,它可以并且将会退回到 java 堆栈。
那么为什么版本 1 有效?
因为它是一个普通的 class 类型堆栈无法匹配,所以它将它解析为一个通用类型并返回一个基于 kryo 的序列化程序。您可以从控制台对此进行测试,并看到它 returns 是一个通用类型。
> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>
版本 2 不起作用,因为它将类型识别为一个案例 class,然后为它的每个成员递归派生 TypeInformation
个实例。这对于不同于 Any
的通配符类型是不可能的,因此推导失败。
一般来说,您不应将 Flink 与异构类型一起使用,因为它无法为您的工作负载派生出高效的序列化程序。