Akka Sink 永不关闭

Akka Sink never closes

我正在使用 Alpakka 将单个文件上传到 SFTP 服务器,但是一旦文件上传并且我收到成功响应,Sink 保持打开状态,我该如何排空它?

我从这个开始:

val sink = Sftp.toPath(path, settings, false)
val source = Source.single(ByteString(data))
​
source
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(sink)(Keep.both).run()
  .map(_.wasSuccessful)

但这最终永远不会离开地图步骤。 我试图添加一个 killswitch,但这似乎没有效果(无论是关闭还是中止):

val sink = Sftp.toPath(path, settings, false)
val source = Source.single(ByteString(data))
​
val (killswitch, result) = source
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(sink)(Keep.both).run()

result.map {
  killswitch.shutdown()
  _.wasSuccessful
}

我是不是做错了什么?我只想要一个结果。

编辑 发送到 toPath 的设置:

SftpSettings(InetAddress.getByName(host))
    .withCredentials(FtpCredentials.create(amexUsername, amexPassword))
    .withStrictHostKeyChecking(false)

我看不到你的整个代码,但在我看来,在你发布的代码片段中,result 值是一个 Future,它在你的程序执行结束之前没有完成,这是因为代码在map 也没有被执行。

让你把 Await.result(result, Duration.Inf) 放在最后,我想检查一下 A. Gregoris 表达的理论。因此它要么

  • 您的应用程序在 Future 完成或
  • 之前退出
  • (如果您的应用程序没有退出)执行此操作的函数会丢弃 result

如果您的应用没有退出,您可以尝试使用 result.onComplete 来完成必要的工作。