将 Akka Graph DSL Stream return 类型转换为 Future[T] 而不是 T(等待)
Convert Akka Graph DSL Stream return type to Future[T] Instead of T (Wait)
考虑以下 class:
class MongoDumpService @Inject()(eventsDao: EventDAO)(implicit val ec: ExecutionContext, mat: Materializer) extends LazyLogging {
private[services] def toAssetsWriterSink: Sink[List[Asset], FileDetails] = ParquetService.toParquetSingleFile[List[Asset]](AppConfig.AssetsFileName)
private[services] def toExpenseWriterSink: Sink[List[Expense], FileDetails] = ParquetService.toParquetSingleFile[List[Expense]](AppConfig.ExpensesFileName)
private[services] def toReportsWriterSink: Sink[List[Report], FileDetails] = ParquetService.toParquetSingleFile[List[Report]](AppConfig.ReportsFileName)
private[services] def toTransactionsWriterSink: Sink[List[Transaction], FileDetails] = ParquetService.toParquetSingleFile[List[Transaction]](AppConfig.TransactionsFileName)
private[services] def toEventsWriterSink: Sink[PacificOriginalEvent, FileDetails] = ParquetService.toParquetSingleFile[PacificOriginalEvent](AppConfig.PacificOriginalEventFileName)
def createMongoDump(recordingId: BSONObjectID, maxDocs: Option[Int] = None): List[FileDetails] = RunnableGraph.fromGraph(
GraphDSL.create(toAssetsWriterSink, toExpenseWriterSink, toReportsWriterSink, toTransactionsWriterSink, toEventsWriterSink, sharedKillSwitch.flow[Event])((f1,f2,f3,f4,f5,_) => List(f1,f2,f3,f4,f5)) {
import GraphDSL.Implicits._
implicit builder =>
(writeAssets, writeExpenses, writeReports, writeTransactions, writerEvents, sw) =>
val source = builder.add(eventsDao.getEventsSource(recordingId.stringify, maxDocs))
val broadcast = builder.add(Broadcast[Event](5))
source ~> sw ~> broadcast
broadcast.out(Write.PacificEvents).map(_.pacificEvent) ~> writerEvents
broadcast.out(Write.Expenses).filter(_.expenses.isDefined).map(_.expenses.get) ~> writeExpenses
broadcast.out(Write.Assets).filter(_.assets.isDefined).map(_.assets.get) ~> writeAssets
broadcast.out(Write.Reports).filter(_.reports.isDefined).map(_.reports.get) ~> writeReports
broadcast.out(Write.Transactions).filter(_.transactions.isDefined).map(_.transactions.get) ~> writeTransactions
ClosedShape
}).run()
}
这段代码是returnList[FileDetails]]
,它实际上是将Event
包含Option[List[T]]
的一些字段的对象写入它应该写入的文件,例如fieldA ~> writerFieldA
等等
问题如下:
我想等待直到这个操作完成,因为这将上传到 S3
个文件 0KB
:
private[actors] def uploadDataToS3(recording: Recording) = {
logger.info(s"Uploading data to S3 with recordingId: ${recording._id.stringify}")
val details = mongoDumpService.createMongoDump(recording._id, recording.limit)
s3Service.uploadFiles(recording._id.stringify, details)
}
没有图形 DSL 我可以做到 runWith
returns Future[..]
如何使用 graphDSL 实现此目的?(我想 return Future[List[FileDetails]]]
编辑:
已添加到 ParquetSingleFile
def toParquetSingleFile[In](fileName: String)(implicit
ec: ExecutionContext,
mat: Materializer,
writes: Writes[In]): Sink[In, FileDetails] = {
val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
toJsString[In]
.log(s"ParquetService", _ => s"[✍️] - Writing element toParquetSingleFile for path: $absolutePath ...")
.withAttributes(Attributes.logLevels(onFailure = LogLevels.Error, onFinish = LogLevels.Off, onElement = LogLevels.Info))
.to(
ParquetStreams.toParquetSingleFile(
path = absolutePath,
options = ParquetWriter.Options(
writeMode = ParquetFileWriter.Mode.OVERWRITE,
compressionCodecName = CompressionCodecName.GZIP))
).mapMaterializedValue(_ => FileDetails(absolutePath, FileExtension.PARQUET))
}
解决方案:
def toParquetSingleFile[In](fileName: String)(implicit ec: ExecutionContext, mat: Materializer, writes: Writes[In]): Sink[In, Future[Option[FileDetails]]] = {
val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
toJsString[In]
.toMat(
Sink.lazySink(() => ParquetStreams.toParquetSingleFile(
path = absolutePath,
options = ParquetWriter.Options(
writeMode = ParquetFileWriter.Mode.OVERWRITE,
compressionCodecName = CompressionCodecName.GZIP))
)
)(Keep.right)
.mapMaterializedValue(_.flatten
.map { _ =>
logger.info(s"[ParquetService] - [✍️] Writing file: [$absolutePath] Finished!")
Some(FileDetails(absolutePath, FileExtension.PARQUET))
}
.recover {
case _: NeverMaterializedException => Option.empty[FileDetails]
}
)
}
如我所见,此 toParquetSingleFile 创建了一个 Sink,其 Future[Done] 作为物化值。但是,在您的函数中,您正在通过 mapMaterializedValue 一个 FileDetails 实例 returning。我认为您正在使用的 mapMaterializedValue 函数接受
的函数
mapMaterializedValue(mat: Future[Done] => Mat2)
因此,如果您将 Future[Done] 映射到 Future[FileDetails],您将拥有一个 List[Future[FileDetails]],您可以使用 Future 序列操作或其他方法将其展平以获得 Future[List[FileDetails] ]]
尝试模拟您的场景,您有一个创建 Sink 的函数,该 Sink 写入文件并具体化 Future[完成]:
case class FileDetails(absPath: String, fileExtension: Int)
def sink[In] : Sink[In, Done] = ???
从您的函数中删除 mapMaterializedValue,您将得到与上面类似的内容。
然后,创建一个映射该物化值的函数:
def mapMatValue[In](in: Sink[In, Future[Done]]) =
in.mapMaterializedValue(result => result.map(_ => FileDetails("path", 0))
使用它,你的 createMongoDump 应该 return Sink[In, List[Future[FileDetails]]
最后,使用Future.sequence(list)得到一个Future[List[Future.sequence]].你也可以使用遍历。
考虑以下 class:
class MongoDumpService @Inject()(eventsDao: EventDAO)(implicit val ec: ExecutionContext, mat: Materializer) extends LazyLogging {
private[services] def toAssetsWriterSink: Sink[List[Asset], FileDetails] = ParquetService.toParquetSingleFile[List[Asset]](AppConfig.AssetsFileName)
private[services] def toExpenseWriterSink: Sink[List[Expense], FileDetails] = ParquetService.toParquetSingleFile[List[Expense]](AppConfig.ExpensesFileName)
private[services] def toReportsWriterSink: Sink[List[Report], FileDetails] = ParquetService.toParquetSingleFile[List[Report]](AppConfig.ReportsFileName)
private[services] def toTransactionsWriterSink: Sink[List[Transaction], FileDetails] = ParquetService.toParquetSingleFile[List[Transaction]](AppConfig.TransactionsFileName)
private[services] def toEventsWriterSink: Sink[PacificOriginalEvent, FileDetails] = ParquetService.toParquetSingleFile[PacificOriginalEvent](AppConfig.PacificOriginalEventFileName)
def createMongoDump(recordingId: BSONObjectID, maxDocs: Option[Int] = None): List[FileDetails] = RunnableGraph.fromGraph(
GraphDSL.create(toAssetsWriterSink, toExpenseWriterSink, toReportsWriterSink, toTransactionsWriterSink, toEventsWriterSink, sharedKillSwitch.flow[Event])((f1,f2,f3,f4,f5,_) => List(f1,f2,f3,f4,f5)) {
import GraphDSL.Implicits._
implicit builder =>
(writeAssets, writeExpenses, writeReports, writeTransactions, writerEvents, sw) =>
val source = builder.add(eventsDao.getEventsSource(recordingId.stringify, maxDocs))
val broadcast = builder.add(Broadcast[Event](5))
source ~> sw ~> broadcast
broadcast.out(Write.PacificEvents).map(_.pacificEvent) ~> writerEvents
broadcast.out(Write.Expenses).filter(_.expenses.isDefined).map(_.expenses.get) ~> writeExpenses
broadcast.out(Write.Assets).filter(_.assets.isDefined).map(_.assets.get) ~> writeAssets
broadcast.out(Write.Reports).filter(_.reports.isDefined).map(_.reports.get) ~> writeReports
broadcast.out(Write.Transactions).filter(_.transactions.isDefined).map(_.transactions.get) ~> writeTransactions
ClosedShape
}).run()
}
这段代码是returnList[FileDetails]]
,它实际上是将Event
包含Option[List[T]]
的一些字段的对象写入它应该写入的文件,例如fieldA ~> writerFieldA
等等
问题如下:
我想等待直到这个操作完成,因为这将上传到 S3
个文件 0KB
:
private[actors] def uploadDataToS3(recording: Recording) = {
logger.info(s"Uploading data to S3 with recordingId: ${recording._id.stringify}")
val details = mongoDumpService.createMongoDump(recording._id, recording.limit)
s3Service.uploadFiles(recording._id.stringify, details)
}
没有图形 DSL 我可以做到 runWith
returns Future[..]
如何使用 graphDSL 实现此目的?(我想 return Future[List[FileDetails]]]
编辑:
已添加到 ParquetSingleFile
def toParquetSingleFile[In](fileName: String)(implicit
ec: ExecutionContext,
mat: Materializer,
writes: Writes[In]): Sink[In, FileDetails] = {
val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
toJsString[In]
.log(s"ParquetService", _ => s"[✍️] - Writing element toParquetSingleFile for path: $absolutePath ...")
.withAttributes(Attributes.logLevels(onFailure = LogLevels.Error, onFinish = LogLevels.Off, onElement = LogLevels.Info))
.to(
ParquetStreams.toParquetSingleFile(
path = absolutePath,
options = ParquetWriter.Options(
writeMode = ParquetFileWriter.Mode.OVERWRITE,
compressionCodecName = CompressionCodecName.GZIP))
).mapMaterializedValue(_ => FileDetails(absolutePath, FileExtension.PARQUET))
}
解决方案:
def toParquetSingleFile[In](fileName: String)(implicit ec: ExecutionContext, mat: Materializer, writes: Writes[In]): Sink[In, Future[Option[FileDetails]]] = {
val absolutePath = TEMP_DIRECTORY + File.separator + s"$fileName.${FileExtension.PARQUET.toSuffix}"
toJsString[In]
.toMat(
Sink.lazySink(() => ParquetStreams.toParquetSingleFile(
path = absolutePath,
options = ParquetWriter.Options(
writeMode = ParquetFileWriter.Mode.OVERWRITE,
compressionCodecName = CompressionCodecName.GZIP))
)
)(Keep.right)
.mapMaterializedValue(_.flatten
.map { _ =>
logger.info(s"[ParquetService] - [✍️] Writing file: [$absolutePath] Finished!")
Some(FileDetails(absolutePath, FileExtension.PARQUET))
}
.recover {
case _: NeverMaterializedException => Option.empty[FileDetails]
}
)
}
如我所见,此 toParquetSingleFile 创建了一个 Sink,其 Future[Done] 作为物化值。但是,在您的函数中,您正在通过 mapMaterializedValue 一个 FileDetails 实例 returning。我认为您正在使用的 mapMaterializedValue 函数接受
的函数mapMaterializedValue(mat: Future[Done] => Mat2)
因此,如果您将 Future[Done] 映射到 Future[FileDetails],您将拥有一个 List[Future[FileDetails]],您可以使用 Future 序列操作或其他方法将其展平以获得 Future[List[FileDetails] ]]
尝试模拟您的场景,您有一个创建 Sink 的函数,该 Sink 写入文件并具体化 Future[完成]:
case class FileDetails(absPath: String, fileExtension: Int)
def sink[In] : Sink[In, Done] = ???
从您的函数中删除 mapMaterializedValue,您将得到与上面类似的内容。
然后,创建一个映射该物化值的函数:
def mapMatValue[In](in: Sink[In, Future[Done]]) =
in.mapMaterializedValue(result => result.map(_ => FileDetails("path", 0))
使用它,你的 createMongoDump 应该 return Sink[In, List[Future[FileDetails]]
最后,使用Future.sequence(list)得到一个Future[List[Future.sequence]].你也可以使用遍历。