spring-cloud-stream kafka json 内容类型
spring-cloud-stream kafka json content-type
我正在尝试将 spring-cloud-stream 1.0.0.M4 与各种活页夹一起使用。我不会在生产者和消费者之间共享我的数据 t运行sfer 对象(谁做的?)所以我 运行 需要包括绑定的内容类型配置。
生产者配置:
spring:
cloud:
stream:
bindings:
customer-save: "customer-save"
customer-save.content-type: application/json
生产者代码:
public interface CustomerChannels {
@Output("customer-save")
MessageChannel save();
}
@Service
public class CustomerServiceImpl implements CustomerService {
@Autowired
private CustomerChannels customerChannels;
...
@Override
public void insertCustomer(Customer customer) {
customerChannels.save().send(MessageBuilder.withPayload(customer).build());
}
消费者配置:
spring:
cloud:
stream:
bindings:
customer-save: "customer-save"
customer-save.content-type: application/x-java-object;type=com.build.customer.domain.Customer
消费者代码:
public interface CustomerChannels {
String CUSTOMER_SAVE = "customer-save";
@Input(CUSTOMER_SAVE)
SubscribableChannel save();
}
@MessageEndpoint
public class CustomerProcessor {
@Autowired
private CustomerDao customerDao;
@ServiceActivator(inputChannel = CustomerChannels.CUSTOMER_SAVE)
public void saveCustomer(Customer customer) {
if (customer.getId() == null) {
customerDao.insertCustomer(customer);
} else {
customerDao.updateCustomer(customer);
}
}
}
我已经用 rabbit、redis 和 kafka 活页夹试过了,发现 json hack 只适用于 rabbit。我在使用 kafka 和 redis 的消费者中收到以下错误。
kafka消费者错误:
2016-03-05 21:54:43.337 ERROR 18846 --- [pool-8-thread-1] o.s.i.k.listener.LoggingErrorHandler : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 2188302100, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=248 cap=248]), KafkaMessageMetadata [offset=4, nextOffset=5, Partition[topic='customer-save', id=0]]
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'customer-save'; nested exception is java.lang.IllegalArgumentException: Unknown type for contentType header value: class java.util.LinkedHashMap
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:468) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
redis 消费者错误:
2016-03-05 21:51:04.727 ERROR 18122 --- [hannel-adapter1] o.s.c.s.b.r.RedisMessageChannelBinder : Failed to deliver message; retries exhausted; message sent to queue 'ERRORS:customer-save.anonymous.08e94dac-49fe-464e-b800-28a3844dbaf6'
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'customer-save'; nested exception is java.lang.IllegalArgumentException: Unknown type for contentType header value: class java.util.LinkedHashMap
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:468) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
这是一个known issue;这是必须为 Kafka 和 Redis.
序列化 headers 的副作用
尝试使用快照存储库中的 1.0.0。BUILD-SNAPSHOT。
我正在尝试将 spring-cloud-stream 1.0.0.M4 与各种活页夹一起使用。我不会在生产者和消费者之间共享我的数据 t运行sfer 对象(谁做的?)所以我 运行 需要包括绑定的内容类型配置。
生产者配置:
spring:
cloud:
stream:
bindings:
customer-save: "customer-save"
customer-save.content-type: application/json
生产者代码:
public interface CustomerChannels {
@Output("customer-save")
MessageChannel save();
}
@Service
public class CustomerServiceImpl implements CustomerService {
@Autowired
private CustomerChannels customerChannels;
...
@Override
public void insertCustomer(Customer customer) {
customerChannels.save().send(MessageBuilder.withPayload(customer).build());
}
消费者配置:
spring:
cloud:
stream:
bindings:
customer-save: "customer-save"
customer-save.content-type: application/x-java-object;type=com.build.customer.domain.Customer
消费者代码:
public interface CustomerChannels {
String CUSTOMER_SAVE = "customer-save";
@Input(CUSTOMER_SAVE)
SubscribableChannel save();
}
@MessageEndpoint
public class CustomerProcessor {
@Autowired
private CustomerDao customerDao;
@ServiceActivator(inputChannel = CustomerChannels.CUSTOMER_SAVE)
public void saveCustomer(Customer customer) {
if (customer.getId() == null) {
customerDao.insertCustomer(customer);
} else {
customerDao.updateCustomer(customer);
}
}
}
我已经用 rabbit、redis 和 kafka 活页夹试过了,发现 json hack 只适用于 rabbit。我在使用 kafka 和 redis 的消费者中收到以下错误。
kafka消费者错误:
2016-03-05 21:54:43.337 ERROR 18846 --- [pool-8-thread-1] o.s.i.k.listener.LoggingErrorHandler : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 2188302100, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=248 cap=248]), KafkaMessageMetadata [offset=4, nextOffset=5, Partition[topic='customer-save', id=0]]
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'customer-save'; nested exception is java.lang.IllegalArgumentException: Unknown type for contentType header value: class java.util.LinkedHashMap
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:468) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
redis 消费者错误:
2016-03-05 21:51:04.727 ERROR 18122 --- [hannel-adapter1] o.s.c.s.b.r.RedisMessageChannelBinder : Failed to deliver message; retries exhausted; message sent to queue 'ERRORS:customer-save.anonymous.08e94dac-49fe-464e-b800-28a3844dbaf6'
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'customer-save'; nested exception is java.lang.IllegalArgumentException: Unknown type for contentType header value: class java.util.LinkedHashMap
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:468) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
这是一个known issue;这是必须为 Kafka 和 Redis.
序列化 headers 的副作用尝试使用快照存储库中的 1.0.0。BUILD-SNAPSHOT。