使用 Rabbitmq 总线管理自定义处理器异常

Manage custom processor exception with Rabbitmq bus

我制作了一个可能会抛出异常的自定义处理器模块。

但是当发生异常时,我在日志文件或标准输出中看不到堆栈跟踪。

这是我的示例代码:

@Transformer(inputChannel = "input", outputChannel = "output")
public String transform(final String payload) {
    if (StringUtils.isEmpty(payload)) {
        throw new MessageConversionException("payload is empty");
    }

    logger.warn("log|warn: {}", payload);

    logger.debug("log|debug: {}", payload);

    return payload.toUpperCase();
}

你知道哪里出了问题吗?

提前致谢,

西蒙

您使用的是哪个版本的 XD?我刚刚测试了 1.3.1 版本并在容器日志中看到异常正常...

2016-03-22T08:55:27-0400 1.3.1.RELEASE WARN xdbus.foo.0-1 retry.RejectAndDontRequeueRecoverer - Retries exhausted for message (Body:'2016-03-22
...
Caused by: org.springframework.amqp.support.converter.MessageConversionException: foo
...

我建议您启用 DEBUG 日志记录。然而,我的出现在 WARN 下。

这是我的示例代码:

CustomProcessor.class

package com.mypackage.xdmodule;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.StringUtils;

@MessageEndpoint
public class CustomProcessor {

    private static final Logger logger = LoggerFactory.getLogger(CustomProcessor.class);

    @Transformer(inputChannel = "input", outputChannel = "output")
    public String transform(final String payload) {
        if (StringUtils.isEmpty(payload)) {
            logger.error("payload is empty");
            throw new MessageConversionException("payload is empty");
        }

        logger.warn("log|warn: {}", payload);

        logger.debug("log|debug: {}", payload);

        return payload.toUpperCase();
    }
}

ModuleConfiguration.class

package com.mypackage.xdmodule;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
public class ModuleConfiguration {

  @Autowired
  private CustomProcessor processor;

  @Bean
  public MessageChannel input() {
    return new DirectChannel();
  }

  @Bean
  public MessageChannel output() {
    return new DirectChannel();
  }

}

我的直播

stream create --definition "http | transform --expression=payload.toUpperCase() | custom-processor | transform --expression=1/0 | log" --name custom-stream --deploy

我还在 xd-container-logback.groovy 文件中添加了这一行:

logger("com.mypackage.xdmodule", DEBUG, ["STDOUT", "FILE", "AMQP"], true)

问题是因为没有将空字符串发送到下一个流。