高级消费者块上的 commitOffsets 是否?
Does commitOffsets on high-level consumer block?
在 Java 客户端 (http://kafka.apache.org/documentation.html#highlevelconsumerapi) 中,是在高级消费者块上执行 commitOffsets 直到偏移量成功提交,还是即发即弃?
Does commitOffsets on the high-level consumer block until offsets are successfully committed?
看起来 commitOffsets()
循环遍历每个消费者并在其偏移量发生变化时调用 updatePersistentPath
,如果发生变化则通过 zkClient.writeData(path, getBytes(data))
写入数据。似乎 commitOffsets()
会阻塞 直到提交所有偏移量。
这是 commitOffsets()
(ref) 的源代码:
public void commitOffsets() {
if (zkClient == null) {
logger.error("zk client is null. Cannot commit offsets");
return;
}
for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
for (PartitionTopicInfo info : e.getValue().values()) {
final long lastChanged = info.getConsumedOffsetChanged().get();
if (lastChanged == 0) {
logger.trace("consume offset not changed");
continue;
}
final long newOffset = info.getConsumedOffset();
//path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
try {
ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
} catch (Throwable t) {
logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
} finally {
info.resetComsumedOffsetChanged(lastChanged);
if (logger.isDebugEnabled()) {
logger.debug("Committed [" + path + "] for topic " + info);
}
}
}
}
}
和 updatePersistentPath(...)
(ref):
public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
try {
zkClient.writeData(path, getBytes(data));
} catch (ZkNoNodeException e) {
createParentPath(zkClient, path);
try {
zkClient.createPersistent(path, getBytes(data));
} catch (ZkNodeExistsException e2) {
zkClient.writeData(path, getBytes(data));
}
}
}
在 Java 客户端 (http://kafka.apache.org/documentation.html#highlevelconsumerapi) 中,是在高级消费者块上执行 commitOffsets 直到偏移量成功提交,还是即发即弃?
Does commitOffsets on the high-level consumer block until offsets are successfully committed?
看起来 commitOffsets()
循环遍历每个消费者并在其偏移量发生变化时调用 updatePersistentPath
,如果发生变化则通过 zkClient.writeData(path, getBytes(data))
写入数据。似乎 commitOffsets()
会阻塞 直到提交所有偏移量。
这是 commitOffsets()
(ref) 的源代码:
public void commitOffsets() {
if (zkClient == null) {
logger.error("zk client is null. Cannot commit offsets");
return;
}
for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
for (PartitionTopicInfo info : e.getValue().values()) {
final long lastChanged = info.getConsumedOffsetChanged().get();
if (lastChanged == 0) {
logger.trace("consume offset not changed");
continue;
}
final long newOffset = info.getConsumedOffset();
//path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
try {
ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
} catch (Throwable t) {
logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
} finally {
info.resetComsumedOffsetChanged(lastChanged);
if (logger.isDebugEnabled()) {
logger.debug("Committed [" + path + "] for topic " + info);
}
}
}
}
}
和 updatePersistentPath(...)
(ref):
public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
try {
zkClient.writeData(path, getBytes(data));
} catch (ZkNoNodeException e) {
createParentPath(zkClient, path);
try {
zkClient.createPersistent(path, getBytes(data));
} catch (ZkNodeExistsException e2) {
zkClient.writeData(path, getBytes(data));
}
}
}