Nifi 自定义 Kafka 处理器代码限时运行
Nifi custom Kafka processor code works for a limited time
这是我的自定义 kafka 处理器的代码,它只使用 kafka 主题并生成一些数据
ConsumerRecords<byte[],byte[]> records = consumer.poll(1000);
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
});
此代码获取一批大约 500 条 kakfa 消息并生成一些 flowFile 用于输出。我需要的显然是把它放在一个 while 循环中,一遍又一遍地做同样的事情。但是,当我这样做时,处理器什么也没有。尽管如此,信息日志显示 flowFile id 已递增,并且似乎已生成实际的 flowFile。我测试的一件事是这只发生在无限循环中。当我使用有限的 for 循环时,处理器工作正常。我想知道可能有一些我不知道的关于 nifi 流内部的东西。
问题是我没有手动提交会话。所以只有当方法返回时它才会被提交,这在无限 while 循环的情况下从未发生过。人为的解决方案最终变成了这样。
while(true)
ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
session.commit();
});
}
这是我的自定义 kafka 处理器的代码,它只使用 kafka 主题并生成一些数据
ConsumerRecords<byte[],byte[]> records = consumer.poll(1000);
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
});
此代码获取一批大约 500 条 kakfa 消息并生成一些 flowFile 用于输出。我需要的显然是把它放在一个 while 循环中,一遍又一遍地做同样的事情。但是,当我这样做时,处理器什么也没有。尽管如此,信息日志显示 flowFile id 已递增,并且似乎已生成实际的 flowFile。我测试的一件事是这只发生在无限循环中。当我使用有限的 for 循环时,处理器工作正常。我想知道可能有一些我不知道的关于 nifi 流内部的东西。
问题是我没有手动提交会话。所以只有当方法返回时它才会被提交,这在无限 while 循环的情况下从未发生过。人为的解决方案最终变成了这样。
while(true)
ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
FlowFile flowFile = session.create();
if (flowFile == null) {
return;
}
try {
byte[] outputBytes = (record == null) ? EMPTY_JSON_OBJECT :
genericData.toString(record.value()).getBytes(StandardCharsets.UTF_8);
flowFile = session.write(flowFile, rawOut -> {
rawOut.write(outputBytes);
consumer.commitSync();
});
} catch (ProcessException pe) {
getLogger().error("Failed to deserialize {}", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
flowFile = session.putAttribute(flowFile, "topic", record.topic());
flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json");
getLogger().info("flowFile id " + flowFile.getId());
session.transfer(flowFile, REL_SUCCESS);
session.commit();
});
}