适用于 RDD 和 Seq 的通用方法
Generic method which works with RDD and Seq
我想编写一个方法来接受 RDD
和 Seq
而无需复制我的代码。
def myMethod[F[_]](input: F[InputClass]): F[OutputClass] = {
// do something here like
input.map{ i =>
// transformed input OutputClass
}
}
F
可以是 Seq
或 RDD
因为它们都实现了 map
方法。
对于像 count
或 cache
这样更独特的方法,我可以让 Seq
对 cache
不做任何事情,而对 [=17= 使用 length
]?
你要的是Type Class。
如果你只需要 map
和 flatMap
方法,我建议你使用 Monad (也许 Cats 一个) 并提供 RDD
.
的实现
现在,如果您想要更多的方法,您可以实现自己的类型 Class。
import scala.language.higherKinds
trait DataCollection[F[_]] {
def map[A, B](col: F[A])(f: A => B): F[B]
def cache[A](col: F[A]): F[A]
def count[A](col: F[A]): Long
}
object DataCollection {
implicit val RddDataCollection: DataCollection[RDD] = new DataCollection[RDD] {
override def map[A, B](rdd: RDD[A])(f: A => B): RDD[B] = rdd.map(f)
override def cache[A](rdd: RDD[A]): RDD[A] = rdd.cache()
override def count[A](rdd: RDD[A]): Long = rdd.count()
}
implicit val SeqDataCollection: DataCollection[Seq] = new DataCollection[Seq] {
override def map[A, B](seq: Seq[A])(f: A => B): Seq[B] = seq.map(f)
override def cache[A](seq: Seq[A]): Seq[A] = seq
override def count[A](seq: Seq[A]): Long = seq.length
}
implicit class Ops[F[_], A](val col: F[A]) extends AnyVal {
@inline
def map[B](f: A => B)(implicit DC: DataCollection[F]): F[B] = DC.map(col)(f)
@inline
def cache()(implicit DC: DataCollection[F]): F[A] = DC.cache(col)
@inline
def count()(implicit DC: DataCollection[F]): Long = DC.count(col)
}
}
def myGenericMethod[F[_]: DataCollection, T](col: F[T]): Long = {
import DataCollection.Ops
col.map(x => x).cache().count()
}
我想编写一个方法来接受 RDD
和 Seq
而无需复制我的代码。
def myMethod[F[_]](input: F[InputClass]): F[OutputClass] = {
// do something here like
input.map{ i =>
// transformed input OutputClass
}
}
F
可以是 Seq
或 RDD
因为它们都实现了 map
方法。
对于像 count
或 cache
这样更独特的方法,我可以让 Seq
对 cache
不做任何事情,而对 [=17= 使用 length
]?
你要的是Type Class。
如果你只需要 map
和 flatMap
方法,我建议你使用 Monad (也许 Cats 一个) 并提供 RDD
.
现在,如果您想要更多的方法,您可以实现自己的类型 Class。
import scala.language.higherKinds
trait DataCollection[F[_]] {
def map[A, B](col: F[A])(f: A => B): F[B]
def cache[A](col: F[A]): F[A]
def count[A](col: F[A]): Long
}
object DataCollection {
implicit val RddDataCollection: DataCollection[RDD] = new DataCollection[RDD] {
override def map[A, B](rdd: RDD[A])(f: A => B): RDD[B] = rdd.map(f)
override def cache[A](rdd: RDD[A]): RDD[A] = rdd.cache()
override def count[A](rdd: RDD[A]): Long = rdd.count()
}
implicit val SeqDataCollection: DataCollection[Seq] = new DataCollection[Seq] {
override def map[A, B](seq: Seq[A])(f: A => B): Seq[B] = seq.map(f)
override def cache[A](seq: Seq[A]): Seq[A] = seq
override def count[A](seq: Seq[A]): Long = seq.length
}
implicit class Ops[F[_], A](val col: F[A]) extends AnyVal {
@inline
def map[B](f: A => B)(implicit DC: DataCollection[F]): F[B] = DC.map(col)(f)
@inline
def cache()(implicit DC: DataCollection[F]): F[A] = DC.cache(col)
@inline
def count()(implicit DC: DataCollection[F]): Long = DC.count(col)
}
}
def myGenericMethod[F[_]: DataCollection, T](col: F[T]): Long = {
import DataCollection.Ops
col.map(x => x).cache().count()
}