我怎样才能正确地将数据分发给两个消费者?
How can i properly distribute data into two consumers?
在我的应用程序中,我使用的是活动 mq,在消费者方面,我是 运行 两个实例(消费者)来处理 request.As 两个实例正在监听同一个队列,有些冲突发生了。如果多次收到相同的数据,一个请求被处理多次,为了克服这个问题并传达两个实例,我已经实现了 hazelcast 并且它运行良好,但有时数据没有正确分配到两个实例中,如果我只发送批量数据一个实例正在处理所有任务。
我在生产者端使用的代码。
public synchronized static void createMQRequestForPoster(Long zoneId, List<String> postJobs, int priority) throws JMSException {
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
logger.info("List of jobs adding into poster queue: "+postJobs.size());
for(String str : postJobs) {
TextMessage message = session.createTextMessage();
JSONObject obj = new JSONObject();
obj.put("priority", priority);
obj.put("zoneId", zoneId);
obj.put("postJobs", str);
logger.debug(obj.toString());
message.setText(obj.toString());
message.setIntProperty(ActiveMQReqProcessor.REQ_TYPE, 0);
producer.send(message);
}
} catch (JMSException | JSONException e) {
logger.warn("Failed to add poster request to ActiveMq", e);
} finally {
if(connection != null)
connection.close();
}
}
我在消费者端使用的代码。
private static void activeMQPostProcessor() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AppCoding.NZ_JMS_URL);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");
MessageConsumer consumer = session.createConsumer(queue);
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
logger.info("Received message " + textMessage.getText());
JSONObject jsonObj = new JSONObject(textMessage.getText());
HazelcastClusterInstance.getInstance().add(processOnPoster(jsonObj));
message.acknowledge();
} catch (JSONException e) {
} catch (JMSException e) {
}
logger.info("Adding Raw Message to Internal Queue");
}
}
};
consumer.setMessageListener(listener);
logger.info("Waiting for new posts from selector, scheduler.");
} catch (JMSException e) {
logger.info(e);
}
}
我只在观察这个案例 times.How 我可以处理这个吗?
您很可能发现预取消息的平衡不均衡。消费者连接并请求预取块,第一个往往会在第二个进入之前获得大量数据,因此它似乎占用了消息。
您需要针对您的用例调整预取,请参阅 prefetch documentation。
在我的应用程序中,我使用的是活动 mq,在消费者方面,我是 运行 两个实例(消费者)来处理 request.As 两个实例正在监听同一个队列,有些冲突发生了。如果多次收到相同的数据,一个请求被处理多次,为了克服这个问题并传达两个实例,我已经实现了 hazelcast 并且它运行良好,但有时数据没有正确分配到两个实例中,如果我只发送批量数据一个实例正在处理所有任务。
我在生产者端使用的代码。
public synchronized static void createMQRequestForPoster(Long zoneId, List<String> postJobs, int priority) throws JMSException {
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
logger.info("List of jobs adding into poster queue: "+postJobs.size());
for(String str : postJobs) {
TextMessage message = session.createTextMessage();
JSONObject obj = new JSONObject();
obj.put("priority", priority);
obj.put("zoneId", zoneId);
obj.put("postJobs", str);
logger.debug(obj.toString());
message.setText(obj.toString());
message.setIntProperty(ActiveMQReqProcessor.REQ_TYPE, 0);
producer.send(message);
}
} catch (JMSException | JSONException e) {
logger.warn("Failed to add poster request to ActiveMq", e);
} finally {
if(connection != null)
connection.close();
}
}
我在消费者端使用的代码。
private static void activeMQPostProcessor() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(AppCoding.NZ_JMS_URL);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customQueue");
MessageConsumer consumer = session.createConsumer(queue);
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
logger.info("Received message " + textMessage.getText());
JSONObject jsonObj = new JSONObject(textMessage.getText());
HazelcastClusterInstance.getInstance().add(processOnPoster(jsonObj));
message.acknowledge();
} catch (JSONException e) {
} catch (JMSException e) {
}
logger.info("Adding Raw Message to Internal Queue");
}
}
};
consumer.setMessageListener(listener);
logger.info("Waiting for new posts from selector, scheduler.");
} catch (JMSException e) {
logger.info(e);
}
}
我只在观察这个案例 times.How 我可以处理这个吗?
您很可能发现预取消息的平衡不均衡。消费者连接并请求预取块,第一个往往会在第二个进入之前获得大量数据,因此它似乎占用了消息。
您需要针对您的用例调整预取,请参阅 prefetch documentation。