如何在 OSGI 框架中配置 Kafka 0.9.0.0 消费者?
How to configure Kafka 0.9.0.0 consumer in OSGI framework?
我有 OSGI 框架,其中,我在一个包中接受 REST 调用,其余调用中收到的数据被发送到 KAFKA broker。还有另一个 bundle 将使用来自 broker 的消息。
如果我在 REST bundle 之前初始化 KAFKA Consumer bundle,REST bundleActivator 永远不会被调用,因为代码在 KAFKA Consumer 代码的 while 循环中运行。如果我在消费者包之前初始化 REST 包,消费者包永远不会启动。
KAFKA Bundle Activator代码如下:
public class KafkaConsumerActivator implements BundleActivator {
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KEY_DESERIALIZER = "key.deserializer";
private ConsumerConnector consumerConnector;
private KafkaConsumer<String, String> consumer;
private static final String VALUE_DESERIALIZER = "value.deserializer";
public void start(BundleContext context) throws Exception {
Properties properties = new Properties();
properties.put(ZOOKEEPER_CONNECT,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
properties.put(BOOTSTRAP_SERVERS,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(properties);
try {
consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
你永远不应该在 Activator 的启动方法中做一些花费很长时间的事情。它将阻止整个 OSGi 框架。
您最好在一个额外的线程中执行整个连接和循环。在停止方法中,您可以告诉该线程退出。
我有 OSGI 框架,其中,我在一个包中接受 REST 调用,其余调用中收到的数据被发送到 KAFKA broker。还有另一个 bundle 将使用来自 broker 的消息。
如果我在 REST bundle 之前初始化 KAFKA Consumer bundle,REST bundleActivator 永远不会被调用,因为代码在 KAFKA Consumer 代码的 while 循环中运行。如果我在消费者包之前初始化 REST 包,消费者包永远不会启动。
KAFKA Bundle Activator代码如下:
public class KafkaConsumerActivator implements BundleActivator {
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KEY_DESERIALIZER = "key.deserializer";
private ConsumerConnector consumerConnector;
private KafkaConsumer<String, String> consumer;
private static final String VALUE_DESERIALIZER = "value.deserializer";
public void start(BundleContext context) throws Exception {
Properties properties = new Properties();
properties.put(ZOOKEEPER_CONNECT,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
properties.put(BOOTSTRAP_SERVERS,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(properties);
try {
consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
你永远不应该在 Activator 的启动方法中做一些花费很长时间的事情。它将阻止整个 OSGi 框架。
您最好在一个额外的线程中执行整个连接和循环。在停止方法中,您可以告诉该线程退出。