如何在 akka 流中模拟 Sink?
How to emulate Sink in akka streams?
我有一个使用 akka-stream-alpakka
multipartUpload
的简单 "save" 函数,它看起来像这样:
def save(fileName: String): Future[AWSLocation] = {
val uuid: String = s"${UUID.randomUUID()}"
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName")
val file = Paths.get(s"/tmp/$fileName")
FileIO.fromPath(file).runWith(s3Sink).map(res => {
AWSLocation(uuid, fileName, res.key)
}).recover {
case ex: S3Exception =>
logger.error("Upload to S3 failed with s3 exception", ex)
throw ex
case ex: Throwable =>
logger.error("Upload to S3 failed with an unknown exception", ex)
throw ex
}
}
我想测试这个功能,2个案例:
- multipartUpload 成功,我得到 AWSLocation(我的情况 class)。
- multipartUpload 失败,我收到 S3Exception
所以我想监视 multipartUpload 和 return 我自己的接收器,像这样:
val mockAmazonS3ProxyService: S3ClientProxy = mock[S3ClientProxy]
val s3serviceMock: S3Service = mock[S3Service]
override val fakeApplication: Application = GuiceApplicationBuilder()
.overrides(bind[S3ClientProxy].toInstance(mockAmazonS3ProxyService))
.router(Router.empty).build()
"test" in {
when(mockAmazonS3ProxyService.multipartUpload(anyString(), anyString())) thenReturn Sink(ByteString.empty, Future.successful(MultipartUploadResult(Uri(""),"","myKey123","",Some(""))))
val res = s3serviceMock.save("someFileName").futureValue
res.key shouldBe "myKey123"
}
问题是我得到 Error:(47, 93) akka.stream.scaladsl.Sink.type does not take parameters
,我知道我不能像这样创建接收器,但我该怎么做?
或者有什么更好的测试方法?
考虑重新设计您的方法 save
使其变得更易于测试,并且可以注入特定接收器,为不同的测试产生不同的结果(如 Bennie Krijger 所述)。
def save(fileName: String): Future[AWSLocation] = {
val uuid: String = s"${UUID.randomUUID()}"
save(fileName)(() => s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName"))
}
def save(
fileName: String
)(createS3UploadSink: () => Sink[ByteString, Future[MultipartUploadResult]]): Future[AWSLocation] = {
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = createS3UploadSink()
val file = Paths.get(s"/tmp/$fileName")
FileIO
.fromPath(file)
.runWith(s3Sink)
.map(res => {
AWSLocation(uuid, fileName, res.key)
})
.recover {
case ex: S3Exception =>
logger.error("Upload to S3 failed with s3 exception", ex)
throw ex
case ex: Throwable =>
logger.error("Upload to S3 failed with an unknown exception", ex)
throw ex
}
}
测试看起来像
class MultipartUploadSpec extends TestKit(ActorSystem("multipartUpload")) with FunSpecLike {
implicit val mat: Materializer = ActorMaterializer()
describe("multipartUpload") {
it("should pass failure") {
val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.failed(new RuntimeException)))
// assert result
}
it("should pass successfully") {
val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.successful(new MultipartUploadResult(???))))
// assert result
}
}
我有一个使用 akka-stream-alpakka
multipartUpload
的简单 "save" 函数,它看起来像这样:
def save(fileName: String): Future[AWSLocation] = {
val uuid: String = s"${UUID.randomUUID()}"
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName")
val file = Paths.get(s"/tmp/$fileName")
FileIO.fromPath(file).runWith(s3Sink).map(res => {
AWSLocation(uuid, fileName, res.key)
}).recover {
case ex: S3Exception =>
logger.error("Upload to S3 failed with s3 exception", ex)
throw ex
case ex: Throwable =>
logger.error("Upload to S3 failed with an unknown exception", ex)
throw ex
}
}
我想测试这个功能,2个案例:
- multipartUpload 成功,我得到 AWSLocation(我的情况 class)。
- multipartUpload 失败,我收到 S3Exception
所以我想监视 multipartUpload 和 return 我自己的接收器,像这样:
val mockAmazonS3ProxyService: S3ClientProxy = mock[S3ClientProxy]
val s3serviceMock: S3Service = mock[S3Service]
override val fakeApplication: Application = GuiceApplicationBuilder()
.overrides(bind[S3ClientProxy].toInstance(mockAmazonS3ProxyService))
.router(Router.empty).build()
"test" in {
when(mockAmazonS3ProxyService.multipartUpload(anyString(), anyString())) thenReturn Sink(ByteString.empty, Future.successful(MultipartUploadResult(Uri(""),"","myKey123","",Some(""))))
val res = s3serviceMock.save("someFileName").futureValue
res.key shouldBe "myKey123"
}
问题是我得到 Error:(47, 93) akka.stream.scaladsl.Sink.type does not take parameters
,我知道我不能像这样创建接收器,但我该怎么做?
或者有什么更好的测试方法?
考虑重新设计您的方法 save
使其变得更易于测试,并且可以注入特定接收器,为不同的测试产生不同的结果(如 Bennie Krijger 所述)。
def save(fileName: String): Future[AWSLocation] = {
val uuid: String = s"${UUID.randomUUID()}"
save(fileName)(() => s3Client.multipartUpload(s"$bucketName", s"$uuid/$fileName"))
}
def save(
fileName: String
)(createS3UploadSink: () => Sink[ByteString, Future[MultipartUploadResult]]): Future[AWSLocation] = {
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = createS3UploadSink()
val file = Paths.get(s"/tmp/$fileName")
FileIO
.fromPath(file)
.runWith(s3Sink)
.map(res => {
AWSLocation(uuid, fileName, res.key)
})
.recover {
case ex: S3Exception =>
logger.error("Upload to S3 failed with s3 exception", ex)
throw ex
case ex: Throwable =>
logger.error("Upload to S3 failed with an unknown exception", ex)
throw ex
}
}
测试看起来像
class MultipartUploadSpec extends TestKit(ActorSystem("multipartUpload")) with FunSpecLike {
implicit val mat: Materializer = ActorMaterializer()
describe("multipartUpload") {
it("should pass failure") {
val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.failed(new RuntimeException)))
// assert result
}
it("should pass successfully") {
val result = save(() => Sink.ignore.mapMaterializedValue(_ => Future.successful(new MultipartUploadResult(???))))
// assert result
}
}