尝试计算 Spark 中两个类别之间的唯一用户数
Attempting to count unique users between two categories in Spark
我在 Spark 中有一个包含两列的数据集结构,一列称为 user
,另一列称为 category
。这样 table 看起来像这样:
+---------------+---------------+
| user| category|
+---------------+---------------+
| garrett| syncopy|
| garrison| musictheory|
| marta| sheetmusic|
| garrett| orchestration|
| harold| chopin|
| marta| russianmusic|
| niko| piano|
| james| sheetmusic|
| manny| violin|
| charles| gershwin|
| dawson| cello|
| bob| cello|
| george| cello|
| george| americanmusic|
| bob| personalcompos|
| george| sheetmusic|
| fred| sheetmusic|
| bob| sheetmusic|
| garrison| sheetmusic|
| george| musictheory|
+---------------+---------------+
only showing top 20 rows
table 中的每一行都是唯一的,但用户和类别可以出现多次。 objective是统计两个分类共有的用户数。例如 cello
和 americanmusic
共享一个名为 george
的用户,而 musictheory
和 sheetmusic
共享用户 george
和 garrison
。目标是获得 n 个类别之间的不同用户的数量,这意味着类别之间最多有 n 个平方边。我部分了解如何执行此操作,但我正在努力将我的想法转换为 Spark Java。
我的想法是,我需要在 user
上进行自连接以获得结构如下的 table:
+---------------+---------------+---------------+
| user| category| category|
+---------------+---------------+---------------+
| garrison| musictheory| sheetmusic|
| george| musictheory| sheetmusic|
| garrison| musictheory| musictheory|
| george| musictheory| musicthoery|
| garrison| sheetmusic| musictheory|
| george| sheetmusic| musictheory|
+---------------+---------------+---------------+
Spark中的self join操作(Java代码)并不难:
Dataset<Row> newDataset = allUsersToCategories.join(allUsersToCategories, "users");
这已经到了某个程度,但是我得到了与上例中第 3 行和第 4 行中相同类别的映射,并且我得到了向后映射,其中类别被颠倒了,这样基本上就像在行中一样重复计算每个用户交互上面例子的5和6.
我认为我需要做的是在我的连接中设置某种条件,按照 X < Y
的方式说明一些内容,以便丢弃相同的类别和重复项。最后,我需要计算 n 平方组合的不同行数,其中 n 是类别数。
有人可以解释一下如何在 Spark 中执行此操作,特别是 Spark Java 因为我对 Scala 语法有点不熟悉吗?
感谢您的帮助。
我不确定我是否正确理解了您的要求,但我会尽力提供帮助。
根据我的理解,上述数据的预期结果应如下所示。如果这不是真的,请告诉我,我会尝试进行必要的修改。
+--------------+--------------+-+
|_1 |_2 |
+--------------+--------------+-+
|personalcompos|sheetmusic |1|
|cello |musictheory |1|
|americanmusic |cello |1|
|cello |sheetmusic |2|
|cello |personalcompos|1|
|russianmusic |sheetmusic |1|
|americanmusic |sheetmusic |1|
|americanmusic |musictheory |1|
|musictheory |sheetmusic |2|
|orchestration |syncopy |1|
+--------------+--------------+-+
在这种情况下,您可以使用以下 Scala 代码解决您的问题:
allUsersToCategories
.groupByKey(_.user)
.flatMapGroups{case (user, userCategories) =>
val categories = userCategories.map(uc => uc.category).toSeq
for {
c1 <- categories
c2 <- categories
if c1 < c2
} yield (c1, c2)
}
.groupByKey(x => x)
.count()
.show()
如果您需要对称结果,您可以将 flatMapGroups
转换中的 if 语句更改为 if c1 != c2
.
请注意,在上面的示例中,我使用了数据集 API,出于测试目的,它是使用以下代码创建的:
case class UserCategory(user: String, category: String)
val allUsersToCategories = session.createDataset(Seq(
UserCategory("garrett", "syncopy"),
UserCategory("garrison", "musictheory"),
UserCategory("marta", "sheetmusic"),
UserCategory("garrett", "orchestration"),
UserCategory("harold", "chopin"),
UserCategory("marta", "russianmusic"),
UserCategory("niko", "piano"),
UserCategory("james", "sheetmusic"),
UserCategory("manny", "violin"),
UserCategory("charles", "gershwin"),
UserCategory("dawson", "cello"),
UserCategory("bob", "cello"),
UserCategory("george", "cello"),
UserCategory("george", "americanmusic"),
UserCategory("bob", "personalcompos"),
UserCategory("george", "sheetmusic"),
UserCategory("fred", "sheetmusic"),
UserCategory("bob", "sheetmusic"),
UserCategory("garrison", "sheetmusic"),
UserCategory("george", "musictheory")
))
我试图在 Java 中提供示例,但我对 Java+Spark 没有任何经验,将上面的示例从 Scala 迁移到 Java...
几个小时前我使用 spark sql
:
找到了答案
Dataset<Row> connection per shared user = spark.sql("SELECT a.user as user, "
+ "a.category as categoryOne, "
+ "b.category as categoryTwo "
+ "FROM allTable as a INNER JOIN allTable as b "
+ "ON a.user = b.user AND a.user < b.user");
这将创建一个包含三列 user
、categoryOne
和 categoryTwo
的数据集。每行都将是唯一的,并且将指示用户何时存在于两个类别中。
我在 Spark 中有一个包含两列的数据集结构,一列称为 user
,另一列称为 category
。这样 table 看起来像这样:
+---------------+---------------+
| user| category|
+---------------+---------------+
| garrett| syncopy|
| garrison| musictheory|
| marta| sheetmusic|
| garrett| orchestration|
| harold| chopin|
| marta| russianmusic|
| niko| piano|
| james| sheetmusic|
| manny| violin|
| charles| gershwin|
| dawson| cello|
| bob| cello|
| george| cello|
| george| americanmusic|
| bob| personalcompos|
| george| sheetmusic|
| fred| sheetmusic|
| bob| sheetmusic|
| garrison| sheetmusic|
| george| musictheory|
+---------------+---------------+
only showing top 20 rows
table 中的每一行都是唯一的,但用户和类别可以出现多次。 objective是统计两个分类共有的用户数。例如 cello
和 americanmusic
共享一个名为 george
的用户,而 musictheory
和 sheetmusic
共享用户 george
和 garrison
。目标是获得 n 个类别之间的不同用户的数量,这意味着类别之间最多有 n 个平方边。我部分了解如何执行此操作,但我正在努力将我的想法转换为 Spark Java。
我的想法是,我需要在 user
上进行自连接以获得结构如下的 table:
+---------------+---------------+---------------+
| user| category| category|
+---------------+---------------+---------------+
| garrison| musictheory| sheetmusic|
| george| musictheory| sheetmusic|
| garrison| musictheory| musictheory|
| george| musictheory| musicthoery|
| garrison| sheetmusic| musictheory|
| george| sheetmusic| musictheory|
+---------------+---------------+---------------+
Spark中的self join操作(Java代码)并不难:
Dataset<Row> newDataset = allUsersToCategories.join(allUsersToCategories, "users");
这已经到了某个程度,但是我得到了与上例中第 3 行和第 4 行中相同类别的映射,并且我得到了向后映射,其中类别被颠倒了,这样基本上就像在行中一样重复计算每个用户交互上面例子的5和6.
我认为我需要做的是在我的连接中设置某种条件,按照 X < Y
的方式说明一些内容,以便丢弃相同的类别和重复项。最后,我需要计算 n 平方组合的不同行数,其中 n 是类别数。
有人可以解释一下如何在 Spark 中执行此操作,特别是 Spark Java 因为我对 Scala 语法有点不熟悉吗?
感谢您的帮助。
我不确定我是否正确理解了您的要求,但我会尽力提供帮助。
根据我的理解,上述数据的预期结果应如下所示。如果这不是真的,请告诉我,我会尝试进行必要的修改。
+--------------+--------------+-+
|_1 |_2 |
+--------------+--------------+-+
|personalcompos|sheetmusic |1|
|cello |musictheory |1|
|americanmusic |cello |1|
|cello |sheetmusic |2|
|cello |personalcompos|1|
|russianmusic |sheetmusic |1|
|americanmusic |sheetmusic |1|
|americanmusic |musictheory |1|
|musictheory |sheetmusic |2|
|orchestration |syncopy |1|
+--------------+--------------+-+
在这种情况下,您可以使用以下 Scala 代码解决您的问题:
allUsersToCategories
.groupByKey(_.user)
.flatMapGroups{case (user, userCategories) =>
val categories = userCategories.map(uc => uc.category).toSeq
for {
c1 <- categories
c2 <- categories
if c1 < c2
} yield (c1, c2)
}
.groupByKey(x => x)
.count()
.show()
如果您需要对称结果,您可以将 flatMapGroups
转换中的 if 语句更改为 if c1 != c2
.
请注意,在上面的示例中,我使用了数据集 API,出于测试目的,它是使用以下代码创建的:
case class UserCategory(user: String, category: String)
val allUsersToCategories = session.createDataset(Seq(
UserCategory("garrett", "syncopy"),
UserCategory("garrison", "musictheory"),
UserCategory("marta", "sheetmusic"),
UserCategory("garrett", "orchestration"),
UserCategory("harold", "chopin"),
UserCategory("marta", "russianmusic"),
UserCategory("niko", "piano"),
UserCategory("james", "sheetmusic"),
UserCategory("manny", "violin"),
UserCategory("charles", "gershwin"),
UserCategory("dawson", "cello"),
UserCategory("bob", "cello"),
UserCategory("george", "cello"),
UserCategory("george", "americanmusic"),
UserCategory("bob", "personalcompos"),
UserCategory("george", "sheetmusic"),
UserCategory("fred", "sheetmusic"),
UserCategory("bob", "sheetmusic"),
UserCategory("garrison", "sheetmusic"),
UserCategory("george", "musictheory")
))
我试图在 Java 中提供示例,但我对 Java+Spark 没有任何经验,将上面的示例从 Scala 迁移到 Java...
几个小时前我使用 spark sql
:
Dataset<Row> connection per shared user = spark.sql("SELECT a.user as user, "
+ "a.category as categoryOne, "
+ "b.category as categoryTwo "
+ "FROM allTable as a INNER JOIN allTable as b "
+ "ON a.user = b.user AND a.user < b.user");
这将创建一个包含三列 user
、categoryOne
和 categoryTwo
的数据集。每行都将是唯一的,并且将指示用户何时存在于两个类别中。