使用 Akka Actor 将附件发送到电子邮件网关
Send attachment to email Gateway using Akka Actors
我是 Akka、Scala 的新手。
我必须构建一个服务,发送带有给定 emailIds 附件的电子邮件。我正在使用 Sendgrid 作为网关。
对于附件,我在 S3 中上传了一个大小为 28KB 的文件。
我有 REST 服务,我可以将文档 ID 传递到该服务,通过它我可以将文档作为 InputStream 获取。现在这个输入流必须发送到许多电子邮件 ID。所有这些下载文件都是由我在下面创建的名为 "attachmentActor" 的演员处理的。
现在假设我有两个 emailId,我需要将附件发送到,我面临的问题是它没有向两者发送完整的文件,实际上 28KB 的文件被分成 16KB 和 12KB,最后发送到 emailIds .
所以 emailId 1 会收到 16KB //它实际上应该有 28KB
电子邮件 2 将收到 12KB //它实际上应该有 28KB
代码如下:
class SendgridConsumer{
def receive(request: EmailRequest) = {
val service = Sendgrid(username , password)
val logData = request.logData
var errorMessage = new String
val attachmentRef = system.actorOf(Props[AttachmentRequestConsumer], "attachmentActor")
val future = attachmentRef ? AttachmentRequest(request.documentId.get)
var targetStream = Await.result(future, timeout.duration).asInstanceOf[InputStream]
val results = request.emailContacts.par.map( emailContact => {
val email=postData(new Email(),request , emailContact, targetStream,request.documentName.get)
val sendGridResponse=service.send(email)
}
}
// postData() creates an Email Object
// This is my Attachment Actor
class AttachmentRequestConsumer extends Actor with ActorLogging {
def receive = {
case request:AttachmentRequest => {
log.info(" inside Attachment RequestConsumer with document Id:" + request.documentId)
val req: HttpRequest = Http(url)
val response = req.asBytes
val targetStream = ByteSource.wrap(response.body).openStream()
log.info("response body :" + response.body)
sender ! targetStream
targetStream.close()
}
}
}
关于 actor,您应该了解的一件事是您不应该将可变对象(例如 InputStream)作为消息发送(技术上您可以,只要您不改变它们)。另一件事是消息的发送是异步的。这意味着 targetStream.close()
在其他参与者收到消息之前被调用。这可能是您收到截断附件的原因。
您可以做的一件事是发送数据而不是 InputStream
。像
def receive = {
case request:AttachmentRequest => {
log.info(" inside Attachment RequestConsumer with document Id:" + request.documentId)
val req: HttpRequest = Http(url)
val response = req.asBytes
val data = ByteSource.wrap(response.body).read.toVector
log.info("response body :" + response.body)
sender ! data
}
}
也就是说,如果您可以轻松地将附件的内容放入内存中。如果不是这样,您可以尝试 break it into chunks or something.
附带说明一下,您不应该阻塞接收(Await.result
)。更好的方法是只向 AttachmentRequestConsumer 发送一条消息,然后在 SendgridConsumer
的接收中返回 Seq[Byte]
类型的消息(或者更好的一些包装器,如 AttachmentResponse
)。
我是 Akka、Scala 的新手。
我必须构建一个服务,发送带有给定 emailIds 附件的电子邮件。我正在使用 Sendgrid 作为网关。
对于附件,我在 S3 中上传了一个大小为 28KB 的文件。
我有 REST 服务,我可以将文档 ID 传递到该服务,通过它我可以将文档作为 InputStream 获取。现在这个输入流必须发送到许多电子邮件 ID。所有这些下载文件都是由我在下面创建的名为 "attachmentActor" 的演员处理的。
现在假设我有两个 emailId,我需要将附件发送到,我面临的问题是它没有向两者发送完整的文件,实际上 28KB 的文件被分成 16KB 和 12KB,最后发送到 emailIds .
所以 emailId 1 会收到 16KB //它实际上应该有 28KB
电子邮件 2 将收到 12KB //它实际上应该有 28KB
代码如下:
class SendgridConsumer{
def receive(request: EmailRequest) = {
val service = Sendgrid(username , password)
val logData = request.logData
var errorMessage = new String
val attachmentRef = system.actorOf(Props[AttachmentRequestConsumer], "attachmentActor")
val future = attachmentRef ? AttachmentRequest(request.documentId.get)
var targetStream = Await.result(future, timeout.duration).asInstanceOf[InputStream]
val results = request.emailContacts.par.map( emailContact => {
val email=postData(new Email(),request , emailContact, targetStream,request.documentName.get)
val sendGridResponse=service.send(email)
}
}
// postData() creates an Email Object
// This is my Attachment Actor
class AttachmentRequestConsumer extends Actor with ActorLogging {
def receive = {
case request:AttachmentRequest => {
log.info(" inside Attachment RequestConsumer with document Id:" + request.documentId)
val req: HttpRequest = Http(url)
val response = req.asBytes
val targetStream = ByteSource.wrap(response.body).openStream()
log.info("response body :" + response.body)
sender ! targetStream
targetStream.close()
}
}
}
关于 actor,您应该了解的一件事是您不应该将可变对象(例如 InputStream)作为消息发送(技术上您可以,只要您不改变它们)。另一件事是消息的发送是异步的。这意味着 targetStream.close()
在其他参与者收到消息之前被调用。这可能是您收到截断附件的原因。
您可以做的一件事是发送数据而不是 InputStream
。像
def receive = {
case request:AttachmentRequest => {
log.info(" inside Attachment RequestConsumer with document Id:" + request.documentId)
val req: HttpRequest = Http(url)
val response = req.asBytes
val data = ByteSource.wrap(response.body).read.toVector
log.info("response body :" + response.body)
sender ! data
}
}
也就是说,如果您可以轻松地将附件的内容放入内存中。如果不是这样,您可以尝试 break it into chunks or something.
附带说明一下,您不应该阻塞接收(Await.result
)。更好的方法是只向 AttachmentRequestConsumer 发送一条消息,然后在 SendgridConsumer
的接收中返回 Seq[Byte]
类型的消息(或者更好的一些包装器,如 AttachmentResponse
)。