Akka 流输入(`In`)作为输出(`Out`)
Akka flow Input (`In`) as Output (`Out`)
我正在尝试编写一段代码来执行以下操作:-
- 从远程源(如 s3)读取大型 csv 文件。
- 逐条处理文件。
- 向用户发送通知
- 将输出写入远程位置
输入 csv 中的示例记录:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
我的输入案例 class 表示输入 csv 中的一条记录:
case class InputRecord(recordId: String, name: String, salary: Long)
输出csv中的示例记录(需要写入):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
我的输出案例 class 表示输入 csv 中的一条记录:
case class OutputRecord(recordId: String, name: String, designation: String)
使用 akka stream csv 读取记录(使用 Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
现在我有处理记录的功能了:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
将 OutputRecord 写入 csv 的函数
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
发送电子邮件通知的功能:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
将它们拼接在一起
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
在第 15 和 16 行我收到一个错误,我可以添加第 15 行或第 16 行,但不能同时添加,因为 notify
和 writeOutput
都需要 outputRecord
。一旦通知被调用,我就会松开我的 outputRecord
.
有什么方法可以将 notify
和 writeOutput
添加到同一个图表中?
我不是在寻找并行执行,因为我想先调用 notify
,然后再调用 writeOutput
。所以这没有帮助:https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
这个用例对我来说似乎很简单,但我无法找到一个干净的解决方案。
notify
的输出是PushResult
,但writeOutput
的输入是ByteString
。一旦你改变它就会编译。如果您需要 ByteString
,请从 OutputRecord
.
获取相同的内容
顺便说一句,在您提供的示例代码中,readCSV
和process
中存在类似的错误。
我正在尝试编写一段代码来执行以下操作:-
- 从远程源(如 s3)读取大型 csv 文件。
- 逐条处理文件。
- 向用户发送通知
- 将输出写入远程位置
输入 csv 中的示例记录:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
我的输入案例 class 表示输入 csv 中的一条记录:
case class InputRecord(recordId: String, name: String, salary: Long)
输出csv中的示例记录(需要写入):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
我的输出案例 class 表示输入 csv 中的一条记录:
case class OutputRecord(recordId: String, name: String, designation: String)
使用 akka stream csv 读取记录(使用 Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
现在我有处理记录的功能了:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
将 OutputRecord 写入 csv 的函数
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
发送电子邮件通知的功能:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
将它们拼接在一起
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
在第 15 和 16 行我收到一个错误,我可以添加第 15 行或第 16 行,但不能同时添加,因为 notify
和 writeOutput
都需要 outputRecord
。一旦通知被调用,我就会松开我的 outputRecord
.
有什么方法可以将 notify
和 writeOutput
添加到同一个图表中?
我不是在寻找并行执行,因为我想先调用 notify
,然后再调用 writeOutput
。所以这没有帮助:https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
这个用例对我来说似乎很简单,但我无法找到一个干净的解决方案。
notify
的输出是PushResult
,但writeOutput
的输入是ByteString
。一旦你改变它就会编译。如果您需要 ByteString
,请从 OutputRecord
.
顺便说一句,在您提供的示例代码中,readCSV
和process
中存在类似的错误。