如何使用scala akka流检测重复行
how to detect duplicated line using scala akka stream
我们有一个 scala 应用程序,它从文本文件中读取行并使用 Akka Stream 处理它们。为了获得更好的性能,我们将并行度设置为 5。问题是如果多行包含相同的电子邮件,我们只保留其中一行并将其他行视为重复并抛出错误。我尝试使用 java concurrentHashMap 来检测重复但它没有用,这是我的代码:
allIdentifiers = new ConcurrentHashMap[String, Int]()
Source(rows)
.mapAsync(config.parallelism.value) {
case (dataRow, index) => {
val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
for {
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
_ = logger.debug(
LoggingMessage(
requestId = RequestId(),
message = s"allIdentifiers: $allIdentifiers"
)
)
... more process step ...
} yield foldResponses(sent)
eventResendResult
.leftMap(errors => ResendResult(errors.toList, List.empty))
.merge
}
}
.runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
ResendResult(
result1.errors ++ result2.errors,
result1.results ++ result2.results
)
})
我们将 config.parallelism.value 设置为 5,这意味着任何时候它都会同时处理多达 5 行。我观察到的是,如果彼此相邻有重复的行,它就不起作用,例如:
line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3
从日志中我看到 concurrentHashMap 填充了条目,但所有行都通过了重复检测并移至下一个流程步骤。
那么 Akka Stream 的并行性与 java 的多线程不是一回事吗?在这种情况下我如何检测重复的行?
问题出在以下代码段中:
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
特别是:想象一下两个线程同时处理一封应该被删除重复的电子邮件。有可能发生以下情况(按顺序)
- 第一个线程检查
containsKey
并发现电子邮件不在地图中
- 第二个线程检查
containsKey
并发现电子邮件不在地图中
- 第一个线程将电子邮件添加到地图(基于步骤 1 的结果。)并通过
传递电子邮件
- 第二个线程将电子邮件添加到地图(基于步骤 3 的结果。)并通过
传递电子邮件
换句话说:您需要自动检查映射中的键 并 更新它。这是一种很常见的事情,所以它正是 ConcurrentHashMap
的 put
所做的:它更新键处的值和 returns 它替换的先前值,如果有一个。
我对 Cats 中的组合器不是很熟悉,所以以下内容可能不是地道的。但是,请注意它如何在一个原子步骤中插入和检查先前的值。
cleanedRow <- EitherT(Future.successful {
val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
Either.cond(
previous != null,
dataRow,
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
)
})
我们有一个 scala 应用程序,它从文本文件中读取行并使用 Akka Stream 处理它们。为了获得更好的性能,我们将并行度设置为 5。问题是如果多行包含相同的电子邮件,我们只保留其中一行并将其他行视为重复并抛出错误。我尝试使用 java concurrentHashMap 来检测重复但它没有用,这是我的代码:
allIdentifiers = new ConcurrentHashMap[String, Int]()
Source(rows)
.mapAsync(config.parallelism.value) {
case (dataRow, index) => {
val eventResendResult: EitherT[Future, NonEmptyList[ResendError], ResendResult] =
for {
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
_ = logger.debug(
LoggingMessage(
requestId = RequestId(),
message = s"allIdentifiers: $allIdentifiers"
)
)
... more process step ...
} yield foldResponses(sent)
eventResendResult
.leftMap(errors => ResendResult(errors.toList, List.empty))
.merge
}
}
.runWith(Sink.reduce { (result1: ResendResult, result2: ResendResult) =>
ResendResult(
result1.errors ++ result2.errors,
result1.results ++ result2.results
)
})
我们将 config.parallelism.value 设置为 5,这意味着任何时候它都会同时处理多达 5 行。我观察到的是,如果彼此相邻有重复的行,它就不起作用,例如:
line 0 contains email1
line 1 contains email1
line 2 contains email2
line 3 contains email2
line 4 contains email3
从日志中我看到 concurrentHashMap 填充了条目,但所有行都通过了重复检测并移至下一个流程步骤。 那么 Akka Stream 的并行性与 java 的多线程不是一回事吗?在这种情况下我如何检测重复的行?
问题出在以下代码段中:
cleanedRow <- EitherT.cond[Future](
!allIdentifiers.containsKey(dataRow.lift(emailIndex)), {
allIdentifiers.put(dataRow.lift(emailIndex),index)
dataRow
}, {
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
}
)
特别是:想象一下两个线程同时处理一封应该被删除重复的电子邮件。有可能发生以下情况(按顺序)
- 第一个线程检查
containsKey
并发现电子邮件不在地图中 - 第二个线程检查
containsKey
并发现电子邮件不在地图中 - 第一个线程将电子邮件添加到地图(基于步骤 1 的结果。)并通过 传递电子邮件
- 第二个线程将电子邮件添加到地图(基于步骤 3 的结果。)并通过 传递电子邮件
换句话说:您需要自动检查映射中的键 并 更新它。这是一种很常见的事情,所以它正是 ConcurrentHashMap
的 put
所做的:它更新键处的值和 returns 它替换的先前值,如果有一个。
我对 Cats 中的组合器不是很熟悉,所以以下内容可能不是地道的。但是,请注意它如何在一个原子步骤中插入和检查先前的值。
cleanedRow <- EitherT(Future.successful {
val previous = allIdentifiers.put(dataRow.lift(emailIndex), index)
Either.cond(
previous != null,
dataRow,
NonEmptyList.of(
DuplicatedError(
s"Duplicated record at row $index",
List(identifier)
)
)
)
})