如何将 Scalding TypedPipe 转换为 Iterator
how to convert Scalding TypedPipe to Iterator
在我的 Scalding hadoop 作业中,我在管道上有一些分组逻辑,然后我需要处理每个组:
val georecs : TypedPipe[GeoRecord] = getRecords
georecs.map( r => (getRegion(r),r) )
.groupBy(_._1)
.mapValueStream( xs => clusterRecords(xs) )
.values
.write(out)
在 clusterRecords 内部,我需要将传入的迭代器转换为 TypedPipe,以便我可以 1) 对其进行采样和 2) 取叉积:
//turn the iterator to a pipe so we can sample it
val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
.sample(0.11)
.distinct
//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)
records
.cross(sample) //cartesian product of records and centroids
.groupBy( _._2) // group By the user record so we get a list of pairs (user, centroid)
.minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
.values
.groupBy( x => x._2 ) //now groupBy centroid to get the clusters
.values
问题是 mapValueStream 期望映射函数到 return 一个迭代器,但我拥有的是一个 TypedPipe。我知道如何将迭代器变成管道,但反之则不然。我是否需要执行它,将其写入磁盘,然后再读回?
如果是这样,最好的方法是什么?
看起来您可以通过 运行 将管道转换为迭代器。可以这样完成:
val georecs : TypedPipe[GeoRecord] = getRecords
val i : Iterator[GeoRecord] = georecs
.toIterableExecution
.waitFor(this.scaldingConfig,this.mode)
.get
.toIterator
(类型检查,但尚未测试)
在我的 Scalding hadoop 作业中,我在管道上有一些分组逻辑,然后我需要处理每个组:
val georecs : TypedPipe[GeoRecord] = getRecords
georecs.map( r => (getRegion(r),r) )
.groupBy(_._1)
.mapValueStream( xs => clusterRecords(xs) )
.values
.write(out)
在 clusterRecords 内部,我需要将传入的迭代器转换为 TypedPipe,以便我可以 1) 对其进行采样和 2) 取叉积:
//turn the iterator to a pipe so we can sample it
val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
.sample(0.11)
.distinct
//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)
records
.cross(sample) //cartesian product of records and centroids
.groupBy( _._2) // group By the user record so we get a list of pairs (user, centroid)
.minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
.values
.groupBy( x => x._2 ) //now groupBy centroid to get the clusters
.values
问题是 mapValueStream 期望映射函数到 return 一个迭代器,但我拥有的是一个 TypedPipe。我知道如何将迭代器变成管道,但反之则不然。我是否需要执行它,将其写入磁盘,然后再读回?
如果是这样,最好的方法是什么?
看起来您可以通过 运行 将管道转换为迭代器。可以这样完成:
val georecs : TypedPipe[GeoRecord] = getRecords
val i : Iterator[GeoRecord] = georecs
.toIterableExecution
.waitFor(this.scaldingConfig,this.mode)
.get
.toIterator
(类型检查,但尚未测试)