PySpark - RDD 中对象的时间重叠
PySpark - Time Overlap for Object in RDD
我的目标是根据时间重叠对对象进行分组。
我的 rdd
中的每个对象都包含一个 start_time
和 end_time
。
我可能效率不高,但我打算做的是根据每个对象是否与任何其他对象有任何时间重叠,为每个对象分配一个重叠 ID。我有时间重叠的逻辑。然后,我希望按 overlap_id
.
分组
首先,
mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)
现在轮到我的问题了。我该如何编写 assign_overlap_id 函数?
def assign_overlap_id(x):
...
...
return (overlap_id, x)
使用 Spark SQL 和数据帧的简单解决方案:
斯卡拉:
import org.apache.spark.sql.functions.udf
case class Interval(start_time: Long, end_time: Long)
val rdd = sc.parallelize(
Interval(0, 3) :: Interval(1, 4) ::
Interval(2, 5) :: Interval(3, 4) ::
Interval(5, 8) :: Interval(7, 10) :: Nil
)
val df = sqlContext.createDataFrame(rdd)
// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
start_second: Long, end_second: Long):Boolean = {
(start_second > start_first & start_second < end_first) |
(end_second > start_first & end_second < end_first)
}
// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")
// Join and filter
sqlContext.sql("""
SELECT * FROM df1 JOIN df2
WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show
使用 PySpark 也一样
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
rdd = sc.parallelize([
(0, 3), (1, 4),
(2, 5), (3, 4),
(5, 8), (7, 10)
])
df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))
def overlaps(start_first, end_first, start_second, end_second):
return ((start_first < start_second < end_first) or
(start_first < end_second < end_first))
sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")
sqlContext.sql("""
SELECT * FROM df1 JOIN df2
WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()
按 window
分组的低级转换
一种更聪明的方法是使用某个指定宽度的 window 生成候选对。这是一个相当简单的解决方案:
斯卡拉:
// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match {
case Interval(start_time, end_time) => {
(start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
}
}
// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))
// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
join(pairs).
map({
case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
}).distinct
// For each candidate pair check if there is overlap
candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) }
Python:
def genRange(start_time, end_time):
return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)
pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
.join(pairs)
.map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
.distinct())
candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))
虽然在某些数据集上对于生产就绪的解决方案来说已经足够了,但您应该考虑实施一些最先进的算法,例如 NCList.
我的目标是根据时间重叠对对象进行分组。
我的 rdd
中的每个对象都包含一个 start_time
和 end_time
。
我可能效率不高,但我打算做的是根据每个对象是否与任何其他对象有任何时间重叠,为每个对象分配一个重叠 ID。我有时间重叠的逻辑。然后,我希望按 overlap_id
.
首先,
mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)
现在轮到我的问题了。我该如何编写 assign_overlap_id 函数?
def assign_overlap_id(x):
...
...
return (overlap_id, x)
使用 Spark SQL 和数据帧的简单解决方案:
斯卡拉:
import org.apache.spark.sql.functions.udf
case class Interval(start_time: Long, end_time: Long)
val rdd = sc.parallelize(
Interval(0, 3) :: Interval(1, 4) ::
Interval(2, 5) :: Interval(3, 4) ::
Interval(5, 8) :: Interval(7, 10) :: Nil
)
val df = sqlContext.createDataFrame(rdd)
// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
start_second: Long, end_second: Long):Boolean = {
(start_second > start_first & start_second < end_first) |
(end_second > start_first & end_second < end_first)
}
// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")
// Join and filter
sqlContext.sql("""
SELECT * FROM df1 JOIN df2
WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show
使用 PySpark 也一样
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
rdd = sc.parallelize([
(0, 3), (1, 4),
(2, 5), (3, 4),
(5, 8), (7, 10)
])
df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))
def overlaps(start_first, end_first, start_second, end_second):
return ((start_first < start_second < end_first) or
(start_first < end_second < end_first))
sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")
sqlContext.sql("""
SELECT * FROM df1 JOIN df2
WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()
按 window
分组的低级转换一种更聪明的方法是使用某个指定宽度的 window 生成候选对。这是一个相当简单的解决方案:
斯卡拉:
// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match {
case Interval(start_time, end_time) => {
(start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
}
}
// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))
// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
join(pairs).
map({
case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
}).distinct
// For each candidate pair check if there is overlap
candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) }
Python:
def genRange(start_time, end_time):
return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)
pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
.join(pairs)
.map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
.distinct())
candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))
虽然在某些数据集上对于生产就绪的解决方案来说已经足够了,但您应该考虑实施一些最先进的算法,例如 NCList.