无法将 <class-name> 类型的负载转换为 byte[] 以发送到 Pub/Sub

Unable to convert payload of type <class-name> to byte[] for sending to Pub/Sub

我正在尝试使用库向 Google Cloud PubSub 发布消息: implementation "com.google.cloud:spring-cloud-gcp-starter-pubsub:2.0.0".

但是,我收到以下异常: com.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: Unable to convert payload of type com.logistics.domain.events.items.FOItemsReturnedTrackingUpdate to byte[] for sending to Pub/Sub.

这是完整的堆栈跟踪:

com.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: Unable to convert payload of type com.logistics.domain.events.items.FOItemsReturnedTrackingUpdate to byte[] for sending to Pub/Sub.
    at com.google.cloud.spring.pubsub.support.converter.SimplePubSubMessageConverter.toPubSubMessage(SimplePubSubMessageConverter.java:63)
    at com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate.publish(PubSubPublisherTemplate.java:86)
    at com.google.cloud.spring.pubsub.core.PubSubTemplate.publish(PubSubTemplate.java:128)
    at com.logistics.adapters.publishers.FOItemsReturnedTrackingUpdateEventPublisherPubSubAdapter.publishItemsReturnedTrackingUpdate(FOItemsReturnedTrackingUpdateEventPublisherPubSubAdapter.java:46)
    at com.logistics.application.services.FOPackagesService.lambda$processFoPackageAnnulled(FOPackagesService.java:100)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at com.logistics.application.services.FOPackagesService.processFoPackageAnnulled(FOPackagesService.java:71)
    at com.logistics.adapters.controllers.DummyController.foPackagesAnnulled(DummyController.java:20)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:681)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:764)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.AbstractRequestLoggingFilter.doFilterInternal(AbstractRequestLoggingFilter.java:289)
    at com.logistic.common.model.CustomRequestLoggingFilter.doFilterInternal(CustomRequestLoggingFilter.java:48)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
    at org.apache.catalina.valves.RemoteIpValve.invoke(RemoteIpValve.java:769)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:359)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:889)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1735)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Thread.java:832)

下面是发布活动的代码:

@Slf4j
@Component
public class ItemsReturnedTrackingUpdateEventPublisherPubSubAdapter implements
        ItemsReturnedTrackingUpdateEventPublisher {

    private final String topicName;
    private final PubSubTemplate pubSubTemplate;
    private final ObjectWriter objectWriter;

    public ItemsReturnedTrackingUpdateEventPublisherPubSubAdapter(String topicName,
                                                                  PubSubTemplate pubSubTemplate,
                                                                  ObjectWriter objectWriter) {
        this.topicName = topicName;
        this.pubSubTemplate = pubSubTemplate;
        this.objectWriter = objectWriter;
    }

    @Override
    public void publishItemsReturnedTrackingUpdate(ItemsReturnedTrackingUpdate data) {
        try {
            Map<String, String> attributes = MDC.getCopyOfContextMap();
            
            attributes.put("eventType", EventType.ITEMS_RETURNED_TRACKING_UPDATE.name());

            var message = objectWriter.writeValueAsString(data);
            log.debug("ItemsReturnedTrackingUpdate event is being published on topic : {} with attributes : {} and data : {}",
                    topicName, attributes, message);
            ListenableFuture<String> result = pubSubTemplate.publish(topicName, data);

            log.debug("ItemsReturnedTrackingUpdate published: {}, Message id: {}", data.getOrderId(), result.get());
        } catch (Exception e) {
            log.error("Error publishing ItemsReturnedTrackingUpdate with id: {}", data.getOrderId());
            e.printStackTrace();
            throw new PublisherException("Error publishing ItemsReturnedTrackingUpdate with id: " + data.getOrderId(), e);
        }
    }
}

查看堆栈跟踪,SimplePubSubMessageConverter 正在发布的对象令人窒息。查看 the doc 以获得 class:

... maps payloads of type byte[], ByteString, ByteBuffer, and String to Pub/Sub messages

变化:

pubSubTemplate.publish(topicName, data);

收件人:

pubSubTemplate.publish(topicName, message);

您需要像下面这样的 pubsub 消息转换器

@Bean
@Primary
public PubSubMessageConverter pubSubMessageConverter(ObjectMapper objectMapper) {
   return new JacksonPubSubMessageConverter(objectMapper);
}

还附加了对象映射器 bean 的示例

@Bean
@Primary
public ObjectMapper objectMapper() {
  ObjectMapper mapper = new ObjectMapper();
  mapper.registerModule(new JavaTimeModule());
  mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
  mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
  return mapper;
}