Akka 流输入(`In`)作为输出(`Out`)

Akka flow Input (`In`) as Output (`Out`)

我正在尝试编写一段代码来执行以下操作:-

  1. 从远程源(如 s3)读取大型 csv 文件。
  2. 逐条处理文件。
  3. 向用户发送通知
  4. 将输出写入远程位置

输入 csv 中的示例记录:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

我的输入案例 class 表示输入 csv 中的一条记录:

case class InputRecord(recordId: String, name: String, salary: Long)

输出csv中的示例记录(需要写入):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

我的输出案例 class 表示输入 csv 中的一条记录:

case class OutputRecord(recordId: String, name: String, designation: String)

使用 akka stream csv 读取记录(使用 Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

现在我有处理记录的功能了:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

将 OutputRecord 写入 csv 的函数

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

发送电子邮件通知的功能:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

将它们拼接在一起

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

在第 15 和 16 行我收到一个错误,我可以添加第 15 行或第 16 行,但不能同时添加,因为 notifywriteOutput 都需要 outputRecord。一旦通知被调用,我就会松开我的 outputRecord.

有什么方法可以将 notifywriteOutput 添加到同一个图表中?

我不是在寻找并行执行,因为我想先调用 notify,然后再调用 writeOutput。所以这没有帮助:https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

这个用例对我来说似乎很简单,但我无法找到一个干净的解决方案。

notify的输出是PushResult,但writeOutput的输入是ByteString。一旦你改变它就会编译。如果您需要 ByteString,请从 OutputRecord.

获取相同的内容

顺便说一句,在您提供的示例代码中,readCSVprocess中存在类似的错误。