Spring Cloud Messaging Source 未向 Kafka 代理发送消息
Spring Cloud Messaging Source is not sending messages to Kafka broker
我正在关注 'Spring Microservices In Action' 这本书,与作者选择的格式有一些小偏差。也就是说,我使用的是 Kotlin 和 Gradle 而不是 Java 和 Maven。除此之外,我主要遵循所提供的代码。
在有关消息传递的章节中,我 运行 遇到了一个问题 - 我无法使用 Source 发布消息 class 我正在自动装配到我的 SimpleSourceBean 中。
我知道一般设置没问题,因为创建了 Kafka 主题,并且在应用程序启动时我看到了相应的日志消息。我已经尝试在 class 主体以及构造函数中显式自动装配源代码,但在任何一种情况下都没有成功
申请class
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(Source::class)
@EnableCircuitBreaker
class OrganizationServiceApplication {
@Bean
@LoadBalanced
fun getRestTemplate(): RestTemplate {
val restTemplate = RestTemplate()
val interceptors = restTemplate.interceptors
interceptors.add(UserContextInterceptor())
restTemplate.interceptors = interceptors
return restTemplate
}
}
fun main(args: Array<String>) {
runApplication<OrganizationServiceApplication>(*args)
}
这是 SimpleSourceBean 实现:
@Component
class SimpleSourceBean {
@Autowired
lateinit var source: Source
val logger = LoggerFactory.getLogger(this.javaClass)
fun publishOrgChange(action: String, orgId: String) {
logger.debug("Sending Kafka message $action for Organization $orgId on source ${source}")
val change = OrganizationChangeModel(
OrganizationChangeModel::class.java.typeName,
action,
orgId,
UserContext.correlationId!!)
logger.debug("change message: $change")
source.output()
.send(MessageBuilder
.withPayload(change)
.build())
logger.debug("Sent Kafka message $action for Organization $orgId successfully")
}
}
这是使用 SimpleSourceBean 将消息发送到 Kafka 的服务 class:
@Component
class OrganizationService {
@Autowired
lateinit var organizationRepository: OrganizationRepository
@Autowired
lateinit var simpleSourceBean: SimpleSourceBean
val logger = LoggerFactory.getLogger(OrganizationService::class.java)
// some omissions for brevity
@HystrixCommand(
fallbackMethod = "fallbackUpdate",
commandKey = "updateOrganizationCommandKey",
threadPoolKey = "updateOrganizationThreadPool")
fun updateOrganization(organizationId: String, organization: Organization): Organization {
val updatedOrg = organizationRepository.save(organization)
simpleSourceBean.publishOrgChange("UPDATE", organizationId)
return updatedOrg
}
private fun fallbackUpdate(organizationId: String, organization: Organization) =
Organization(id = "000-000-00", name = "update not saved", contactEmail = "", contactName = "", contactPhone = "")
@HystrixCommand
fun saveOrganization(organization: Organization): Organization {
val orgToSave = organization.copy(id = UUID.randomUUID().toString())
val savedOrg = organizationRepository.save(orgToSave)
simpleSourceBean.publishOrgChange("SAVE", savedOrg.id)
return savedOrg
}
}
日志消息
organizationservice_1 | 2019-08-23 23:15:33.939 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean : Sending Kafka message UPDATE for Organization e254f8c-c442-4ebe-a82a-e2fc1d1ff78a on source null
organizationservice_1 | 2019-08-23 23:15:33.940 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean : change message: OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1)
organizationservice_1 | 2019-08-23 23:15:33.941 DEBUG 18 --- [ionThreadPool-2] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'output', message: GenericMessage [payload=OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1), headers={id=05799740-f8cf-85f8-54f8-74fce2679909, timestamp=1566602133941}]
organizationservice_1 | 2019-08-23 23:15:33.945 DEBUG 18 --- [ionThreadPool-2] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@38675bb5 received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=64e1e8f1-45f4-b5e6-91d7-c2df28b3d6cc, timestamp=1566602133943}]
organizationservice_1 | 2019-08-23 23:15:33.946 DEBUG 18 --- [ionThreadPool-2] nder$ProducerConfigurationMessageHandler : org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@763a88a received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=7be5d188-5309-cba9-8297-74431c410152, timestamp=1566602133945}]
没有进一步的消息记录,其中包括 SimpleSourceBEan 的最终 DEBUG 日志语句
检查 Kafka 容器内部是否有任何关于 'orgChangeTopic' 主题的消息,它是空的:
root@99442804288f:/opt/kafka_2.11-0.10.1.0/bin# ./kafka-console-consumer.sh --from-beginning --topic orgChangeTopic --bootstrap-server 0.0.0.0:9092
Processed a total of 0 messages
非常感谢任何指出我的问题所在的地方
编辑:
添加 application.yml:
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
zkNodes: "http://kafkaserver:2181"
brokers: "http://kafkaserver:9092"
// omitting some irrelevant config
logging:
level:
org.apache.kafka: DEBUG
org.springframework.cloud: DEBUG
org.springframework.web: WARN
springmicroservicesinaction.organizationservice: DEBUG
具有相关依赖项的 build.gradle 文件的摘录:
dependencies {
// kotlin, spring boot, etc
implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:2.2.0.RELEASE")
}
您还需要显示您的应用程序属性。你的kafka版本很旧; 0.10.x.x 不支持 headers。您使用的 spring-cloud-stream 是什么版本?现代版本需要支持 headers 的 Kafka(0.11 或更高版本 - 当前版本为 2.3),除非您将 headerMode
设置为 none
。
也就是说,如果我们尝试将 headers 发送到不支持它们的版本,我希望看到一条错误消息。
implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")
另请注意,对于现代版本,您不再需要
zkNodes: "http://kafkaserver:2181"
2.2.0使用的kafka-clients版本支持直接通过Kafka broker提供topic,不再需要连接zookeeper
我正在关注 'Spring Microservices In Action' 这本书,与作者选择的格式有一些小偏差。也就是说,我使用的是 Kotlin 和 Gradle 而不是 Java 和 Maven。除此之外,我主要遵循所提供的代码。
在有关消息传递的章节中,我 运行 遇到了一个问题 - 我无法使用 Source 发布消息 class 我正在自动装配到我的 SimpleSourceBean 中。
我知道一般设置没问题,因为创建了 Kafka 主题,并且在应用程序启动时我看到了相应的日志消息。我已经尝试在 class 主体以及构造函数中显式自动装配源代码,但在任何一种情况下都没有成功
申请class
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(Source::class)
@EnableCircuitBreaker
class OrganizationServiceApplication {
@Bean
@LoadBalanced
fun getRestTemplate(): RestTemplate {
val restTemplate = RestTemplate()
val interceptors = restTemplate.interceptors
interceptors.add(UserContextInterceptor())
restTemplate.interceptors = interceptors
return restTemplate
}
}
fun main(args: Array<String>) {
runApplication<OrganizationServiceApplication>(*args)
}
这是 SimpleSourceBean 实现:
@Component
class SimpleSourceBean {
@Autowired
lateinit var source: Source
val logger = LoggerFactory.getLogger(this.javaClass)
fun publishOrgChange(action: String, orgId: String) {
logger.debug("Sending Kafka message $action for Organization $orgId on source ${source}")
val change = OrganizationChangeModel(
OrganizationChangeModel::class.java.typeName,
action,
orgId,
UserContext.correlationId!!)
logger.debug("change message: $change")
source.output()
.send(MessageBuilder
.withPayload(change)
.build())
logger.debug("Sent Kafka message $action for Organization $orgId successfully")
}
}
这是使用 SimpleSourceBean 将消息发送到 Kafka 的服务 class:
@Component
class OrganizationService {
@Autowired
lateinit var organizationRepository: OrganizationRepository
@Autowired
lateinit var simpleSourceBean: SimpleSourceBean
val logger = LoggerFactory.getLogger(OrganizationService::class.java)
// some omissions for brevity
@HystrixCommand(
fallbackMethod = "fallbackUpdate",
commandKey = "updateOrganizationCommandKey",
threadPoolKey = "updateOrganizationThreadPool")
fun updateOrganization(organizationId: String, organization: Organization): Organization {
val updatedOrg = organizationRepository.save(organization)
simpleSourceBean.publishOrgChange("UPDATE", organizationId)
return updatedOrg
}
private fun fallbackUpdate(organizationId: String, organization: Organization) =
Organization(id = "000-000-00", name = "update not saved", contactEmail = "", contactName = "", contactPhone = "")
@HystrixCommand
fun saveOrganization(organization: Organization): Organization {
val orgToSave = organization.copy(id = UUID.randomUUID().toString())
val savedOrg = organizationRepository.save(orgToSave)
simpleSourceBean.publishOrgChange("SAVE", savedOrg.id)
return savedOrg
}
}
日志消息
organizationservice_1 | 2019-08-23 23:15:33.939 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean : Sending Kafka message UPDATE for Organization e254f8c-c442-4ebe-a82a-e2fc1d1ff78a on source null
organizationservice_1 | 2019-08-23 23:15:33.940 DEBUG 18 --- [ionThreadPool-2] S.O.events.source.SimpleSourceBean : change message: OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1)
organizationservice_1 | 2019-08-23 23:15:33.941 DEBUG 18 --- [ionThreadPool-2] o.s.c.s.m.DirectWithAttributesChannel : preSend on channel 'output', message: GenericMessage [payload=OrganizationChangeModel(type=SpringMicroservicesInAction.OrganizationService.events.source.OrganizationChangeModel, action=UPDATE, organizationId=e254f8c-c442-4ebe-a82a-e2fc1d1ff78a, correlationId=c84d288f-bfd6-4217-9026-8a45eab058e1), headers={id=05799740-f8cf-85f8-54f8-74fce2679909, timestamp=1566602133941}]
organizationservice_1 | 2019-08-23 23:15:33.945 DEBUG 18 --- [ionThreadPool-2] tractMessageChannelBinder$SendingHandler : org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler@38675bb5 received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=64e1e8f1-45f4-b5e6-91d7-c2df28b3d6cc, timestamp=1566602133943}]
organizationservice_1 | 2019-08-23 23:15:33.946 DEBUG 18 --- [ionThreadPool-2] nder$ProducerConfigurationMessageHandler : org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@763a88a received message: GenericMessage [payload=byte[224], headers={contentType=application/json, id=7be5d188-5309-cba9-8297-74431c410152, timestamp=1566602133945}]
没有进一步的消息记录,其中包括 SimpleSourceBEan 的最终 DEBUG 日志语句
检查 Kafka 容器内部是否有任何关于 'orgChangeTopic' 主题的消息,它是空的:
root@99442804288f:/opt/kafka_2.11-0.10.1.0/bin# ./kafka-console-consumer.sh --from-beginning --topic orgChangeTopic --bootstrap-server 0.0.0.0:9092
Processed a total of 0 messages
非常感谢任何指出我的问题所在的地方
编辑:
添加 application.yml:
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
zkNodes: "http://kafkaserver:2181"
brokers: "http://kafkaserver:9092"
// omitting some irrelevant config
logging:
level:
org.apache.kafka: DEBUG
org.springframework.cloud: DEBUG
org.springframework.web: WARN
springmicroservicesinaction.organizationservice: DEBUG
具有相关依赖项的 build.gradle 文件的摘录:
dependencies {
// kotlin, spring boot, etc
implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:2.2.0.RELEASE")
}
您还需要显示您的应用程序属性。你的kafka版本很旧; 0.10.x.x 不支持 headers。您使用的 spring-cloud-stream 是什么版本?现代版本需要支持 headers 的 Kafka(0.11 或更高版本 - 当前版本为 2.3),除非您将 headerMode
设置为 none
。
也就是说,如果我们尝试将 headers 发送到不支持它们的版本,我希望看到一条错误消息。
implementation("org.springframework.cloud:spring-cloud-stream:2.2.0.RELEASE")
另请注意,对于现代版本,您不再需要
zkNodes: "http://kafkaserver:2181"
2.2.0使用的kafka-clients版本支持直接通过Kafka broker提供topic,不再需要连接zookeeper