根据时间序列在 Spark 中创建图表 table
Creating a graph in Spark from a time series table
假设我有一个包含三列的 table; user, time, place
。如果他们之间的时间低于某个阈值,我想为每个用户创建 place
转换图;即,在按用户分组并按时间排序后,从连续行 (i, j) 创建边 place_i 到 place_j 的有向图,为每个实例增加边的权重 (place_i, place_j).来源 table 的行没有特别的顺序。 Python API 这可能吗?如果没有,我该如何在 Scala 中实现?
样本table:
user,time,place
joe,1,A
jack,1,B
joe,2,B
jack,3,C
joe,4,D
jane,5,A
jane,1,B
如果我们忽略时间阈值约束,图应该有四个顶点(A,B,C,D)和来自{(A,B),(B,C),(B,D), (B,A)}.
Advanced Analytics with Spark
中有一章是关于使用地理数据的。应该是第8话吧作者在 spark 中讨论了 Sessionization,这与您的问题有关。
为了让每个特定用户的所有转换在时间上彼此接近,Sandy Ryza 称之为 SecondarySort
这在spark core,但他提供了一些代码来做到这一点,你可以找到它 here
我相信如果您阅读该章并遵循代码,您将得到您想要的东西。
我使用了 groupBy
,然后是 flatMapGroups
。在地图内部,我将迭代器实例化为一个列表以便对其进行排序。然后我使用 sliding
成对地遍历列表并创建边缘。
ds.groupBy(_.user).flatMapGroups( (uid, iter) =>
val result = ListBuffer[MySchema]()
iter.toList.sortBy(_.time).sliding(2).foreach { case List(x,y =>
result += MySchema(uid, x.place, if (y.time - x.time < Threshold) y.place else 0)
}
result.toList
}.as[AggSchema].groupBy($"src, $"dst).count.as[Schema]
假设我有一个包含三列的 table; user, time, place
。如果他们之间的时间低于某个阈值,我想为每个用户创建 place
转换图;即,在按用户分组并按时间排序后,从连续行 (i, j) 创建边 place_i 到 place_j 的有向图,为每个实例增加边的权重 (place_i, place_j).来源 table 的行没有特别的顺序。 Python API 这可能吗?如果没有,我该如何在 Scala 中实现?
样本table:
user,time,place
joe,1,A
jack,1,B
joe,2,B
jack,3,C
joe,4,D
jane,5,A
jane,1,B
如果我们忽略时间阈值约束,图应该有四个顶点(A,B,C,D)和来自{(A,B),(B,C),(B,D), (B,A)}.
Advanced Analytics with Spark
中有一章是关于使用地理数据的。应该是第8话吧作者在 spark 中讨论了 Sessionization,这与您的问题有关。
为了让每个特定用户的所有转换在时间上彼此接近,Sandy Ryza 称之为 SecondarySort
这在spark core,但他提供了一些代码来做到这一点,你可以找到它 here
我相信如果您阅读该章并遵循代码,您将得到您想要的东西。
我使用了 groupBy
,然后是 flatMapGroups
。在地图内部,我将迭代器实例化为一个列表以便对其进行排序。然后我使用 sliding
成对地遍历列表并创建边缘。
ds.groupBy(_.user).flatMapGroups( (uid, iter) =>
val result = ListBuffer[MySchema]()
iter.toList.sortBy(_.time).sliding(2).foreach { case List(x,y =>
result += MySchema(uid, x.place, if (y.time - x.time < Threshold) y.place else 0)
}
result.toList
}.as[AggSchema].groupBy($"src, $"dst).count.as[Schema]