Google Pub/Sub 订阅者没有收到消息
Google Pub/Sub Subscriber does not receive messages
首先,我没有使用 Akka 的经验,所以我自己调试起来真的很糟糕。我尝试了 here 中的示例,发布消息有效(这意味着凭据有效),但没有发出任何消息。服务帐户被授予所有权限。
我的代码是这样的,基本上和例子中的一模一样:
package com.example.google.pubsub
import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import scala.concurrent.duration._
import scala.concurrent.Future
object SubscriberMain extends App {
println("#### SUBSCRIBER ####")
val privateKey: PrivateKey = {
import java.io.FileInputStream
val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
val privateKey = credential.getServiceAccountPrivateKey
privateKey
}
val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
val projectId = "weirdproject"
val apiKey = "xxxx"
val subscription = "somesubscription"
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
}
我发现akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic
从来没有被执行过,这似乎是没有获取消息的原因。
您拥有的是流的定义,但您不是 运行 它。调用 run()
:
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
.run() // <---
或者,使用 runWith()
,这是一种方便的方法 returns Sink
的物化值:
val result: Future[Done] =
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.runWith(ackSink)
找到更多关于定义和 运行 流的信息 here。
首先,我没有使用 Akka 的经验,所以我自己调试起来真的很糟糕。我尝试了 here 中的示例,发布消息有效(这意味着凭据有效),但没有发出任何消息。服务帐户被授予所有权限。
我的代码是这样的,基本上和例子中的一模一样:
package com.example.google.pubsub
import java.io.FileInputStream
import java.security.spec.PKCS8EncodedKeySpec
import java.security.{KeyFactory, PrivateKey}
import java.util.Base64
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape}
import akka.stream.alpakka.googlecloud.pubsub.scaladsl.GooglePubSub
import akka.stream.alpakka.googlecloud.pubsub._
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic}
import akka.{Done, NotUsed}
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential
import scala.concurrent.duration._
import scala.concurrent.Future
object SubscriberMain extends App {
println("#### SUBSCRIBER ####")
val privateKey: PrivateKey = {
import java.io.FileInputStream
val credential = GoogleCredential.fromStream(new FileInputStream("mycredentials.json"))
val privateKey = credential.getServiceAccountPrivateKey
privateKey
}
val clientEmail = "main-19@weirdproject.iam.gserviceaccount.com"
val projectId = "weirdproject"
val apiKey = "xxxx"
val subscription = "somesubscription"
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
}
我发现akka.stream.alpakka.googlecloud.pubsub.GooglePubSubSource.createLogic
从来没有被执行过,这似乎是没有获取消息的原因。
您拥有的是流的定义,但您不是 运行 它。调用 run()
:
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.to(ackSink)
.run() // <---
或者,使用 runWith()
,这是一种方便的方法 returns Sink
的物化值:
val result: Future[Done] =
subscriptionSource
.map { message =>
val data = message.message.data
println(s"received a message: $data")
message.ackId
}
.groupedWithin(1000, 1.minute)
.map(AcknowledgeRequest.apply)
.runWith(ackSink)
找到更多关于定义和 运行 流的信息 here。