尝试计算 Spark 中两个类别之间的唯一用户数

Attempting to count unique users between two categories in Spark

我在 Spark 中有一个包含两列的数据集结构,一列称为 user,另一列称为 category。这样 table 看起来像这样:

+---------------+---------------+
|           user|       category|
+---------------+---------------+
|        garrett|        syncopy|
|       garrison|    musictheory|
|          marta|     sheetmusic|
|        garrett|  orchestration|
|         harold|         chopin|
|          marta|   russianmusic|
|           niko|          piano|
|          james|     sheetmusic|
|          manny|         violin|
|        charles|       gershwin|
|         dawson|          cello|
|            bob|          cello|
|         george|          cello|
|         george|  americanmusic|
|            bob| personalcompos|
|         george|     sheetmusic|
|           fred|     sheetmusic|
|            bob|     sheetmusic|
|       garrison|     sheetmusic|
|         george|    musictheory|
+---------------+---------------+
only showing top 20 rows

table 中的每一行都是唯一的,但用户和类别可以出现多次。 objective是统计两个分类共有的用户数。例如 celloamericanmusic 共享一个名为 george 的用户,而 musictheorysheetmusic 共享用户 georgegarrison。目标是获得 n 个类别之间的不同用户的数量,这意味着类别之间最多有 n 个平方边。我部分了解如何执行此操作,但我正在努力将我的想法转换为 Spark Java。

我的想法是,我需要在 user 上进行自连接以获得结构如下的 table:

+---------------+---------------+---------------+
|           user|       category|       category|
+---------------+---------------+---------------+
|       garrison|    musictheory|     sheetmusic|
|         george|    musictheory|     sheetmusic|
|       garrison|    musictheory|    musictheory|
|         george|    musictheory|    musicthoery|
|       garrison|     sheetmusic|    musictheory|
|         george|     sheetmusic|    musictheory|
+---------------+---------------+---------------+

Spark中的self join操作(Java代码)并不难:

Dataset<Row> newDataset = allUsersToCategories.join(allUsersToCategories, "users");

这已经到了某个程度,但是我得到了与上例中第 3 行和第 4 行中相同类别的映射,并且我得到了向后映射,其中类别被颠倒了,这样基本上就像在行中一样重复计算每个用户交互上面例子的5和6.

我认为我需要做的是在我的连接中设置某种条件,按照 X < Y 的方式说明一些内容,以便丢弃相同的类别和重复项。最后,我需要计算 n 平方组合的不同行数,其中 n 是类别数。

有人可以解释一下如何在 Spark 中执行此操作,特别是 Spark Java 因为我对 Scala 语法有点不熟悉吗?

感谢您的帮助。

我不确定我是否正确理解了您的要求,但我会尽力提供帮助。

根据我的理解,上述数据的预期结果应如下所示。如果这不是真的,请告诉我,我会尝试进行必要的修改。

+--------------+--------------+-+
|_1            |_2            |
+--------------+--------------+-+
|personalcompos|sheetmusic    |1|
|cello         |musictheory   |1|
|americanmusic |cello         |1|
|cello         |sheetmusic    |2|
|cello         |personalcompos|1|
|russianmusic  |sheetmusic    |1|
|americanmusic |sheetmusic    |1|
|americanmusic |musictheory   |1|
|musictheory   |sheetmusic    |2|
|orchestration |syncopy       |1|
+--------------+--------------+-+

在这种情况下,您可以使用以下 Scala 代码解决您的问题:

allUsersToCategories
    .groupByKey(_.user)
    .flatMapGroups{case (user, userCategories) =>
      val categories = userCategories.map(uc => uc.category).toSeq
      for {
         c1 <- categories
         c2 <- categories
         if c1 < c2
      } yield (c1, c2)
    }
    .groupByKey(x => x)
    .count()
    .show()

如果您需要对称结果,您可以将 flatMapGroups 转换中的 if 语句更改为 if c1 != c2.

请注意,在上面的示例中,我使用了数据集 API,出于测试目的,它是使用以下代码创建的:

case class UserCategory(user: String, category: String)

val allUsersToCategories = session.createDataset(Seq(
   UserCategory("garrett", "syncopy"),
   UserCategory("garrison", "musictheory"),
   UserCategory("marta", "sheetmusic"),
   UserCategory("garrett", "orchestration"),
   UserCategory("harold", "chopin"),
   UserCategory("marta", "russianmusic"),
   UserCategory("niko", "piano"),
   UserCategory("james", "sheetmusic"),
   UserCategory("manny", "violin"),
   UserCategory("charles", "gershwin"),
   UserCategory("dawson", "cello"),
   UserCategory("bob", "cello"),
   UserCategory("george", "cello"),
   UserCategory("george", "americanmusic"),
   UserCategory("bob", "personalcompos"),
   UserCategory("george", "sheetmusic"),
   UserCategory("fred", "sheetmusic"),
   UserCategory("bob", "sheetmusic"),
   UserCategory("garrison", "sheetmusic"),
   UserCategory("george", "musictheory")
))

我试图在 Java 中提供示例,但我对 Java+Spark 没有任何经验,将上面的示例从 Scala 迁移到 Java...

几个小时前我使用 spark sql:

找到了答案
    Dataset<Row> connection per shared user = spark.sql("SELECT a.user as user, "
                                                            + "a.category as categoryOne, "
                                                            + "b.category as categoryTwo "
                                                            + "FROM allTable as a INNER JOIN allTable as b "
                                                            + "ON a.user = b.user AND a.user < b.user");

这将创建一个包含三列 usercategoryOnecategoryTwo 的数据集。每行都将是唯一的,并且将指示用户何时存在于两个类别中。