spring kafka,MessageDeliveryException:无法将消息发送到频道
spring kafka, MessageDeliveryException: failed to send Message to channel
我正在努力解决有关源、进程和接收器的问题。我有两个项目,应用程序。一个是source,一个是process和sink。
让我分享每个组件以了解发生了什么。
@EnableBinding(MultiProducerChannel.class)
public class RealTimeDataSource{
@Autowired
RealTimeProductionService realTimeProductionService;
@InboundChannelAdapter(value = MultiProducerChannel.SOURCEPRODUCTION, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public JSONArray productionMessageSource() throws Exception {
long currentTime = System.currentTimeMillis();
JSONArray realTimeProductionList = realTimeProductionService.getNewProductionTime();
System.out.println(currentTime + " : Running source...");
return realTimeProductionList;
}
}
@EnableBinding(MultiChannel.class)
public class RealTimeDataProcessor {
@Autowired
RealTimeProductionService realTimeProductionService;
@Transformer(inputChannel = MultiChannel.PROCESSPRODUCTION, outputChannel = MultiChannel.SAVEPRODUCTION)
public JSONObject productionMessageProcessor(List<RealTimeProduction> realTimeProductionList) throws Exception {
JSONObject jsonObject = null;
if(realTimeProductionList != null) {
jsonObject = new JSONObject(realTimeProductionService.getNewProductionTime(realTimeProductionList));
System.out.println("PROCESSOR RUNNING...");
}
return jsonObject;
}
}
@EnableBinding(MultiChannel.class)
public class RealTimeDataSink {
private static final String INDEX_NAME = "c000001_kr_50879_f01";
@Autowired
private JestClient jestClient;
@StreamListener(MultiChannel.SAVEFINALPRODUCTION)
public void productionMessageSink(JSONObject outputs) throws Exception {
if (outputs != null) {
boolean indexExists = jestClient.execute(new IndicesExists.Builder(INDEX_NAME).build()).isSucceeded();
JestResult jestResult = jestClient.execute(new Index.Builder(outputs).index(INDEX_NAME).type("production").build());
System.out.println("SINK RUNNING...");
}
}
}
public interface MultiChannel {
String SOURCEPRODUCTION = "production-source";
String PROCESSPRODUCTION = "production-process";
String SAVEPRODUCTION = "production-save";
String SAVEFINALPRODUCTION = "production-save-final";
@Output(SOURCEPRODUCTION)
MessageChannel sourceproduction();
@Input(PROCESSPRODUCTION)
SubscribableChannel processorproduction();
@Output(SAVEPRODUCTION)
MessageChannel saveproduction();
@Input(SAVEFINALPRODUCTION)
SubscribableChannel savefinalproduction();
}
我认为源代码运行良好。但是我不知道要找到有关此错误的问题。我花了整整三天....还是...不行。
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'production-save'; nested exception is java.lang.IllegalArgumentException: payload must not be null
有什么想法吗?
考虑使用 @StreamListener
和 @SendTo
而不是 @Transformer
进行自动内容类型处理:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RC1/reference/htmlsingle/#_using_streamlistener_for_automatic_content_type_handling.
如果这没有帮助,请考虑为 production-save
绑定配置 contentType: application/json
:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RC1/reference/htmlsingle/#_properties_for_use_of_spring_cloud_stream
我正在努力解决有关源、进程和接收器的问题。我有两个项目,应用程序。一个是source,一个是process和sink。
让我分享每个组件以了解发生了什么。
@EnableBinding(MultiProducerChannel.class)
public class RealTimeDataSource{
@Autowired
RealTimeProductionService realTimeProductionService;
@InboundChannelAdapter(value = MultiProducerChannel.SOURCEPRODUCTION, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public JSONArray productionMessageSource() throws Exception {
long currentTime = System.currentTimeMillis();
JSONArray realTimeProductionList = realTimeProductionService.getNewProductionTime();
System.out.println(currentTime + " : Running source...");
return realTimeProductionList;
}
}
@EnableBinding(MultiChannel.class)
public class RealTimeDataProcessor {
@Autowired
RealTimeProductionService realTimeProductionService;
@Transformer(inputChannel = MultiChannel.PROCESSPRODUCTION, outputChannel = MultiChannel.SAVEPRODUCTION)
public JSONObject productionMessageProcessor(List<RealTimeProduction> realTimeProductionList) throws Exception {
JSONObject jsonObject = null;
if(realTimeProductionList != null) {
jsonObject = new JSONObject(realTimeProductionService.getNewProductionTime(realTimeProductionList));
System.out.println("PROCESSOR RUNNING...");
}
return jsonObject;
}
}
@EnableBinding(MultiChannel.class)
public class RealTimeDataSink {
private static final String INDEX_NAME = "c000001_kr_50879_f01";
@Autowired
private JestClient jestClient;
@StreamListener(MultiChannel.SAVEFINALPRODUCTION)
public void productionMessageSink(JSONObject outputs) throws Exception {
if (outputs != null) {
boolean indexExists = jestClient.execute(new IndicesExists.Builder(INDEX_NAME).build()).isSucceeded();
JestResult jestResult = jestClient.execute(new Index.Builder(outputs).index(INDEX_NAME).type("production").build());
System.out.println("SINK RUNNING...");
}
}
}
public interface MultiChannel {
String SOURCEPRODUCTION = "production-source";
String PROCESSPRODUCTION = "production-process";
String SAVEPRODUCTION = "production-save";
String SAVEFINALPRODUCTION = "production-save-final";
@Output(SOURCEPRODUCTION)
MessageChannel sourceproduction();
@Input(PROCESSPRODUCTION)
SubscribableChannel processorproduction();
@Output(SAVEPRODUCTION)
MessageChannel saveproduction();
@Input(SAVEFINALPRODUCTION)
SubscribableChannel savefinalproduction();
}
我认为源代码运行良好。但是我不知道要找到有关此错误的问题。我花了整整三天....还是...不行。
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'production-save'; nested exception is java.lang.IllegalArgumentException: payload must not be null
有什么想法吗?
考虑使用 @StreamListener
和 @SendTo
而不是 @Transformer
进行自动内容类型处理:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RC1/reference/htmlsingle/#_using_streamlistener_for_automatic_content_type_handling.
如果这没有帮助,请考虑为 production-save
绑定配置 contentType: application/json
:https://docs.spring.io/spring-cloud-stream/docs/Ditmars.RC1/reference/htmlsingle/#_properties_for_use_of_spring_cloud_stream