如何使用 Akka Stream 中的值来实例化 GooglePubSub Flow?
How can I use a value in an Akka Stream to instantiate a GooglePubSub Flow?
我正在尝试创建一个 Flow
以与 Source
队列一起使用。我希望它能与 Alpakka Google PubSub 连接器一起使用:https://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html
为了使用此连接器,我需要创建一个 Flow
,它取决于作为 String
提供的主题名称,如上文 link 和代码片段。
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
问题
我希望能够设置一个 Source
队列来接收发布消息所需的主题和消息。我首先从消息 String
中创建必要的 PublishRequest
。然后我想通过 运行ning GooglePubSub.publish(topic, config)
实例化的 Flow
运行 这个。但是,我不知道如何将主题转到流程的那一部分。
val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
.map(messageData => {
PublishRequest(Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
)
})
.via(GooglePubSub.publish(topic, config))
val bufferSize = 10
val elementsToProcess = 5
// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.preMaterialize()
我不确定是否有办法在主题不属于初始数据流的情况下将其放入队列。而且我不知道如何将流值放入动态 Flow
.
如果我不正确地使用了一些术语,请记住我是新手。
您可以通过使用 flatMapConcat
并在其中生成一个新的 Source
来实现它:
// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
.map(messageData => {
val pr = PublishRequest(immutable.Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
// output flow shape of (String, PublishRequest)
(messageData._1, pr)
})
val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
case (topic: String, pr: PublishRequest) =>
// Create a Source[PublishRequest]
Source.single(pr).via(GooglePubSub.publish(topic, config))
}
// wire it up
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.via(publishFlow)
.preMaterialize()
您可以选择将元组替换为 class 到 document 更好
case class Something(topic: String, payload: PublishRequest)
// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)
Flow[Something[String, PublishRequest]].flatMapConcat { s =>
Source.single(s.payload)... // etc
}
解释:
在gcFlow
中我们输出通过publishFlow
传递的元组(String, PublishRequest)
的FlowShape。输入是元组 (String, PublishRequest)
并且在 flatMapConcat
中我们生成新的 Source[PublishRequest]
流经 GooglePubSub.publish
为每个项目创建新的源会有轻微的开销。这不应该对性能产生可衡量的影响
我正在尝试创建一个 Flow
以与 Source
队列一起使用。我希望它能与 Alpakka Google PubSub 连接器一起使用:https://doc.akka.io/docs/alpakka/current/google-cloud-pub-sub.html
为了使用此连接器,我需要创建一个 Flow
,它取决于作为 String
提供的主题名称,如上文 link 和代码片段。
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
GooglePubSub.publish(topic, config)
问题
我希望能够设置一个 Source
队列来接收发布消息所需的主题和消息。我首先从消息 String
中创建必要的 PublishRequest
。然后我想通过 运行ning GooglePubSub.publish(topic, config)
实例化的 Flow
运行 这个。但是,我不知道如何将主题转到流程的那一部分。
val gcFlow: Flow[(String, String), PublishRequest, NotUsed] = Flow[(String, String)]
.map(messageData => {
PublishRequest(Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._1.getBytes))))
)
})
.via(GooglePubSub.publish(topic, config))
val bufferSize = 10
val elementsToProcess = 5
// newSource is a Source[PublishRequest, NotUsed]
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.preMaterialize()
我不确定是否有办法在主题不属于初始数据流的情况下将其放入队列。而且我不知道如何将流值放入动态 Flow
.
如果我不正确地使用了一些术语,请记住我是新手。
您可以通过使用 flatMapConcat
并在其中生成一个新的 Source
来实现它:
// using tuple assuming (Topic, Message)
val gcFlow: Flow[(String, String), (String, PublishRequest), NotUsed] = Flow[(String, String)]
.map(messageData => {
val pr = PublishRequest(immutable.Seq(
PubSubMessage(new String(Base64.getEncoder.encode(messageData._2.getBytes)))))
// output flow shape of (String, PublishRequest)
(messageData._1, pr)
})
val publishFlow: Flow[(String, PublishRequest), Seq[String], NotUsed] =
Flow[(String, PublishRequest)].flatMapConcat {
case (topic: String, pr: PublishRequest) =>
// Create a Source[PublishRequest]
Source.single(pr).via(GooglePubSub.publish(topic, config))
}
// wire it up
val (queue, newSource) = Source
.queue[(String, String)](bufferSize, OverflowStrategy.backpressure)
.via(gcFlow)
.via(publishFlow)
.preMaterialize()
您可以选择将元组替换为 class 到 document 更好
case class Something(topic: String, payload: PublishRequest)
// output flow shape of Something[String, PublishRequest]
Something(messageData._1, pr)
Flow[Something[String, PublishRequest]].flatMapConcat { s =>
Source.single(s.payload)... // etc
}
解释:
在gcFlow
中我们输出通过publishFlow
传递的元组(String, PublishRequest)
的FlowShape。输入是元组 (String, PublishRequest)
并且在 flatMapConcat
中我们生成新的 Source[PublishRequest]
流经 GooglePubSub.publish
为每个项目创建新的源会有轻微的开销。这不应该对性能产生可衡量的影响