适用于 RDD 和 Seq 的通用方法

Generic method which works with RDD and Seq

我想编写一个方法来接受 RDDSeq 而无需复制我的代码。

def myMethod[F[_]](input: F[InputClass]): F[OutputClass] = {
    // do something here like
    input.map{ i => 
       // transformed input OutputClass
    }
}

F 可以是 SeqRDD 因为它们都实现了 map 方法。

对于像 countcache 这样更独特的方法,我可以让 Seqcache 不做任何事情,而对 [=17= 使用 length ]?

你要的是Type Class。 如果你只需要 mapflatMap 方法,我建议你使用 Monad (也许 Cats 一个) 并提供 RDD.

的实现

现在,如果您想要更多的方法,您可以实现自己的类型 Class。

import scala.language.higherKinds

trait DataCollection[F[_]] {
  def map[A, B](col: F[A])(f: A => B): F[B]
  def cache[A](col: F[A]): F[A]
  def count[A](col: F[A]): Long
}

object DataCollection { 
  implicit val RddDataCollection: DataCollection[RDD] = new DataCollection[RDD] {
    override def map[A, B](rdd: RDD[A])(f: A => B): RDD[B] = rdd.map(f)
    override def cache[A](rdd: RDD[A]): RDD[A] = rdd.cache()
    override def count[A](rdd: RDD[A]): Long = rdd.count()
  }

  implicit val SeqDataCollection: DataCollection[Seq] = new DataCollection[Seq] {
    override def map[A, B](seq: Seq[A])(f: A => B): Seq[B] = seq.map(f)
    override def cache[A](seq: Seq[A]): Seq[A] = seq
    override def count[A](seq: Seq[A]): Long = seq.length
  }

  implicit class Ops[F[_], A](val col: F[A]) extends AnyVal {
    @inline
    def map[B](f: A => B)(implicit DC: DataCollection[F]): F[B] = DC.map(col)(f)

    @inline
    def cache()(implicit DC: DataCollection[F]): F[A] = DC.cache(col)

    @inline
    def count()(implicit DC: DataCollection[F]): Long = DC.count(col)
  }
}

def myGenericMethod[F[_]: DataCollection, T](col: F[T]): Long = {
  import DataCollection.Ops
  col.map(x => x).cache().count()
}