无法从 GCP pubsub 读取

Unable to read from GCP pubsub

我正在努力从云流处理程序正在写入的目标通道读取消息。

我在测试容器中有 pub/sub 运行 的 gcp 模拟器。我能够成功地将消息发送到输入 queue/topic(抱歉,我来自 JMS 背景,不确定 gcp 的正确术语 pub/sub)

spring 流媒体配置:

spring.cloud.gcp.pubsub.emulator-host=localhost:8085
spring.cloud.gcp.pubsub.project-id=project-treeline
spring.cloud.gcp.project-id=project-treeline
spring.cloud.stream.bindings.input.destination=split
spring.cloud.stream.bindings.output.destination=amount
spring.cloud.stream.gcp.pubsub.bindings.amount.consumer.auto-create-resources=true

测试用例:

@ExtendWith(SpringExtension.class)
@SpringBootTest
@Testcontainers
@ActiveProfiles({"test", "gcp"})
@Slf4j
public class SpringProcessorTest {

    private static final int SUCCESS_EXPECTED_SPLIT_MSG_COUNT = 3;

    @Autowired
    private PubSubTemplate template;

    @Autowired
    private PlaylistRepository playlistRepo;

    @Autowired
    private PlaylistEntryRepository playlistEntryRepo;

    @Value("${spring.cloud.stream.bindings.input.destination}")
    private String inputTopic;

    @Value("${spring.cloud.stream.bindings.output.destination}")
    private String outputTopic;


    @Test
    public void successfullyProcessPlaylist() throws Exception {
        final Long playlistId = playlistRepo.findByFileUri(TestDataConst.PLAYLIST_URI).getId();

        playlistEntryRepo.findByPlaylist(playlistId).forEach(c -> template.publish(inputTopic, c.getId().toString()));

        Thread.sleep(4000);

        final List<AcknowledgeablePubsubMessage> msgs = template.pull("output", 3, Boolean.TRUE);
        assertNotNull(msgs);
        assertEquals(SUCCESS_EXPECTED_SPLIT_MSG_COUNT, msgs.size());

    }
}

我尝试使用 template.subscribe 而不是 template.pull,但我仍然遇到同样的问题。控制台日志:

com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist

日志确实表明来自测试代码的事件正在发送并且处理器正在处理消息:

14:15:24.274 [gcp-pubsub-subscriber3] INFO  treeline.service.splits.SplitService - Featured performer id: [7], name: [Robb Flynn], receiving split: [25,000000]
14:15:24.274 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [7,000000]
14:15:24.274 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [7], name: [Robb Flynn], receiving split: [5,833333]
14:15:24.279 [gcp-pubsub-subscriber3] INFO  treeline.service.splits.SplitService - Featured performer id: [12], name: [Logan Mader], receiving split: [25,000000]
14:15:24.279 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [7,000000]
14:15:24.279 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [12], name: [Logan Mader], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [5,833333]
14:15:24.283 [gcp-pubsub-subscriber3] INFO  treeline.service.splits.SplitService - Featured performer id: [8], name: [Adam Deuce], receiving split: [25,000000]
14:15:24.283 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Non-featured performer id: [2], name: [Tom Araya], receiving split: [7,000000]
14:15:24.286 [gcp-pubsub-subscriber3] INFO  treeline.service.splits.SplitService - Featured performer id: [10], name: [Chris Kontos], receiving split: [25,000000]
14:15:24.286 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [26,333333]
14:15:24.286 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [4], name: [Kerry King], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [5], name: [Gary Holt], receiving split: [5,833333]
14:15:24.290 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber1] INFO  treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [26,333333]
14:15:24.293 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Non-featured performer id: [3], name: [Jeff Hanneman], receiving split: [5,833333]
14:15:24.297 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Featured performer id: [13], name: [Nobody 3], receiving split: [21,666667]
14:15:24.302 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Featured performer id: [14], name: [Nobody 1], receiving split: [21,666667]
14:15:24.308 [gcp-pubsub-subscriber2] INFO  treeline.service.splits.SplitService - Featured performer id: [15], name: [Nobody 2], receiving split: [21,666667]
14:15:24.319 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=7), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=12), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=8), Split(splitAmount=null, splitPercentage=25, playlistEntry=3, person=10)], headers={id=dfcb46c4-7cbc-05b1-0030-219dd378141f, timestamp=1566389724287}]
14:15:24.320 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=7), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=12), Split(splitAmount=null, splitPercentage=7, playlistEntry=1, person=2), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=13), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=14), Split(splitAmount=null, splitPercentage=26.333333333333332149095440399833023548126220703125, playlistEntry=1, person=15)], headers={id=387b1f11-14b9-34c2-0a60-054eda20dfa3, timestamp=1566389724293}]
14:15:24.320 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - preSend on channel 'output', message: GenericMessage [payload=[Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=7), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=12), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=2), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=4), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=5), Split(splitAmount=null, splitPercentage=5.83333333333333303727386009995825588703155517578125, playlistEntry=2, person=3), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=13), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=14), Split(splitAmount=null, splitPercentage=21.666666666666667850904559600166976451873779296875, playlistEntry=2, person=15)], headers={id=539cd992-168f-5731-ea9d-c9212f0656ee, timestamp=1566389724309}]
14:15:24.390 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.390 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.391 [gcp-pubsub-subscriber2] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d696eab-b2d1-4d6c-bb45-9385c4567cf3, timestamp=1566389724391}]
14:15:24.391 [gcp-pubsub-subscriber1] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=3e02b7af-cca7-9172-398f-b1bbc4245a00, timestamp=1566389724391}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.b.AbstractMessageChannelBinder$SendingHandler - org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@1f1ef8b2 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.394 [gcp-pubsub-subscriber3] DEBUG o.s.c.g.p.i.o.PubSubMessageHandler - org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler@1c84a8f5 received message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=6cf9bda7-2270-8892-0e2e-e402ea85f88a, timestamp=1566389724394}]
14:15:24.398 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[287], headers={contentType=application/json, id=077a18d2-dec6-8272-1fcf-d271d11117f5, timestamp=1566389724394}]
14:15:24.399 [gcp-pubsub-subscriber3] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=52e2d9ff-2553-0f20-cd6c-1a9d6135f3cf, contentType=application/json, timestamp=1566389724166}]
14:15:24.400 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[575], headers={contentType=application/json, id=c34eb207-1a4c-6bc9-f91e-517c983c73b8, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'output', message: GenericMessage [payload=byte[1091], headers={contentType=application/json, id=6d8d417d-3f45-7b83-d243-2cee0162cfb1, timestamp=1566389724390}]
14:15:24.400 [gcp-pubsub-subscriber2] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=b6808f8b-0169-93e7-64b1-25d6875c14da, contentType=application/json, timestamp=1566389724166}]
14:15:24.401 [gcp-pubsub-subscriber1] DEBUG o.s.c.s.m.DirectWithAttributesChannel - postSend (sent=true) on channel 'input', message: GenericMessage [payload=byte[1], headers={id=f6869d9f-e0ea-1581-e42b-57986624861e, contentType=application/json, timestamp=1566389724166}]
14:15:24.439 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 5
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 4
14:15:24.446 [grpc-default-executor-1] DEBUG o.s.c.g.p.c.p.PubSubPublisherTemplate - Publishing to amount was successful. Message ID: 6

如有任何帮助,我将不胜感激

Pub/Sub 模板是一个相对低级的 API 模板,不知道 Spring Cloud Stream 的存在。在此对 Pub/Sub 模板的调用中,第一个参数需要是 Pub/Sub 订阅名称,而不是 Spring Cloud Stream 输出通道 "output":

final List<AcknowledgeablePubsubMessage> msgs = template.pull("output", 3, Boolean.TRUE);

但您不知道订阅名称!事实上,订阅甚至不存在(GCP Spring Cloud Stream binder 自动创建匿名输入订阅,但由于发布是针对主题完成的,因此不会对输出通道执行任何此类操作)。

您还订阅了订阅,而不是主题(一个主题可以有多个订阅,它们都收到相同的消息)。因此,在测试开始时,您需要创建对输出 "amount" 主题的订阅。 PubSubAdmin bean 可以像使用 PubSubTemplate 一样自动装配。

Subscription sub = pubSubAdmin.createSubscription("testSubscription", "amount");
// ... perform the logic under test ...
final List<AcknowledgeablePubsubMessage> msgs = template.pull("testSubscription", 3, Boolean.TRUE);

GCP Pub/Sub 模板和管理文档:https://github.com/spring-cloud/spring-cloud-gcp/blob/master/docs/src/main/asciidoc/pubsub.adoc GCP Spring 云流文档:https://github.com/spring-cloud/spring-cloud-gcp/blob/master/docs/src/main/asciidoc/spring-stream.adoc