Akka - 测量消费者的时间
Akka - Measure time of consumer
我正在开发一个从 JMS(消费者)中提取消息并将其推送到 Kafka 主题(生产者)的系统。
由于我的消费者一直在等待新消息到达 JMS 队列并将其推送到 Kafka,我如何才能有效地衡量每秒可以拉取多少消息?
这是我的代码:
我的消费者:
class ActiveMqConsumerActor extends Consumer {
var startTime: Long = _
val log = Logging(context.system, this)
val producerActor = context.actorOf(Props[KafkaProducerActor])
override def autoAck = false
override def endpointUri: String = "activemq:KafkaTest"
override def receive: Receive = LoggingReceive {
case msg: CamelMessage =>
val camelMsg = msg.bodyAs[String]
producerActor ! Message(camelMsg.getBytes)
sender() ! Ack
case ex: Exception => sender() ! Failure(ex)
case _ =>
log.error("Got a message that I don't understand")
sender() ! Failure(new Exception("Got a message that I don't understand"))
}
}
主要:
object ActiveMqConsumerTest extends App {
val system = ActorSystem("KafkaSystem")
val camel = CamelExtension(system)
val camelContext = camel.context
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("tcp://0.0.0.0:61616"))
val consumer = system.actorOf(Props[ActiveMqConsumerActor].withRouter(FromConfig), "consumer")
val producer = system.actorOf(Props[KafkaProducerActor].withRouter(FromConfig), "producer")
}
谢谢
您可以尝试使用 "Metrics" 之类的东西。 https://dropwizard.github.io/metrics/3.1.0/manual/ 您可以定义精确的指标,包括时间并在您的 actor 内部使用它。
我正在开发一个从 JMS(消费者)中提取消息并将其推送到 Kafka 主题(生产者)的系统。
由于我的消费者一直在等待新消息到达 JMS 队列并将其推送到 Kafka,我如何才能有效地衡量每秒可以拉取多少消息?
这是我的代码:
我的消费者:
class ActiveMqConsumerActor extends Consumer {
var startTime: Long = _
val log = Logging(context.system, this)
val producerActor = context.actorOf(Props[KafkaProducerActor])
override def autoAck = false
override def endpointUri: String = "activemq:KafkaTest"
override def receive: Receive = LoggingReceive {
case msg: CamelMessage =>
val camelMsg = msg.bodyAs[String]
producerActor ! Message(camelMsg.getBytes)
sender() ! Ack
case ex: Exception => sender() ! Failure(ex)
case _ =>
log.error("Got a message that I don't understand")
sender() ! Failure(new Exception("Got a message that I don't understand"))
}
}
主要:
object ActiveMqConsumerTest extends App {
val system = ActorSystem("KafkaSystem")
val camel = CamelExtension(system)
val camelContext = camel.context
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent("tcp://0.0.0.0:61616"))
val consumer = system.actorOf(Props[ActiveMqConsumerActor].withRouter(FromConfig), "consumer")
val producer = system.actorOf(Props[KafkaProducerActor].withRouter(FromConfig), "producer")
}
谢谢
您可以尝试使用 "Metrics" 之类的东西。 https://dropwizard.github.io/metrics/3.1.0/manual/ 您可以定义精确的指标,包括时间并在您的 actor 内部使用它。