如何使用scala akka流检测重复行

how to detect duplicated line using scala akka stream

我们有一个 scala 应用程序,它从文本文件中读取行并使用 Akka Stream 处理它们。为了获得更好的性能,我们将并行度设置为 5。问题是如果多行包含相同的电子邮件,我们只保留其中一行并将其他行视为重复并抛出错误。我尝试使用 java concurrentHashMap 来检测重复但它没有用,这是我的代码:

allIdentifiers = new ConcurrentHashMap[String, Int]()   
Source(rows)
  .mapAsync(config.parallelism.value) {
    case (dataRow, index) => {

      val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
        for {

          cleanedRow <- EitherT.cond[Future](
            !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
              allIdentifiers.put(dataRow.lift(emailIndex),index)
              dataRow
            }, {
              NonEmptyList.of(
                DuplicatedError(
                  s"Duplicated record at row $index",
                  List(identifier)
                )
              )
            }
          )

          _ = logger.debug(
            LoggingMessage(
              requestId = RequestId(),
              message = s"allIdentifiers: $allIdentifiers"
            )
          )

          ... more process step ...
        } yield foldResponses(sent)

      eventResendResult
        .leftMap(errors => ResendResult(errors.toList, List.empty))
        .merge
    }
  }
  .runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
    ResendResult(
      result1.errors ++ result2.errors,
      result1.results ++ result2.results
    )
  })

我们将 config.parallelism.value 设置为 5,这意味着任何时候它都会同时处理多达 5 行。我观察到的是,如果彼此相邻有重复的行,它就不起作用,例如:

line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3

从日志中我看到 concurrentHashMap 填充了条目,但所有行都通过了重复检测并移至下一个流程步骤。 那么 Akka Stream 的并行性与 java 的多线程不是一回事吗?在这种情况下我如何检测重复的行?

问题出在以下代码段中:

cleanedRow <- EitherT.cond[Future](
  !allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
    allIdentifiers.put(dataRow.lift(emailIndex),index)
    dataRow
  }, {
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  }
)

特别是:想象一下两个线程同时处理一封应该被删除重复的电子邮件。有可能发生以下情况(按顺序)

  1. 第一个线程检查 containsKey 并发现电子邮件不在地图中
  2. 第二个线程检查 containsKey 并发现电子邮件不在地图中
  3. 第一个线程将电子邮件添加到地图(基于步骤 1 的结果。)并通过
  4. 传递电子邮件
  5. 第二个线程将电子邮件添加到地图(基于步骤 3 的结果。)并通过
  6. 传递电子邮件

换句话说:您需要自动检查映射中的键 更新它。这是一种很常见的事情,所以它正是 ConcurrentHashMapput 所做的:它更新键处的值和 returns 它替换的先前值,如果有一个。

我对 Cats 中的组合器不是很熟悉,所以以下内容可能不是地道的。但是,请注意它如何在一个原子步骤中插入和检查先前的值。

cleanedRow <- EitherT(Future.successful {
  val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
  Either.cond(
    previous != null,
    dataRow,
    NonEmptyList.of(
      DuplicatedError(
        s"Duplicated record at row $index",
        List(identifier)
      )
    )
  )
})