Hortonworks Schema Registry + Nifi + Java:反序列化 Nifi 记录
Hortonworks Schema Registry + Nifi + Java: Deserialize Nifi Record
我正在尝试使用 Hortonworks Schema Registry 反序列化一些由 Nifi 序列化的 Kafka 消息
- 在 Nifi 端用作 RecordWritter 的处理器:AvroRecordSetWriter
- 架构写入策略:HWX 内容编码架构参考
我能够在其他 Nifi kafka 消费者中反序列化这些消息。但是我正在尝试使用 Kafka 代码从我的 Flink 应用程序中反序列化它们。
我的 Flink 应用程序的 Kafka 解串器处理程序中有以下内容:
final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();
Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);
这里是反序列化消息的 HWXSchemaRegistryCode:
import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;
public class HWXSchemaRegistry {
private SchemaRegistryClient client;
private Map<String,Object> config;
private AvroSnapshotDeserializer deserializer;
private static HWXSchemaRegistry hwxSRInstance = null;
public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
if(hwxSRInstance == null)
hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
return hwxSRInstance;
}
public Object deserialize(byte[] message) throws IOException {
Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
return o;
}
private static Map<String,Object> properties2Map(Properties config) {
Enumeration<Object> keys = config.keys();
Map<String, Object> configMap = new HashMap<String,Object>();
while (keys.hasMoreElements()) {
Object key = (Object) keys.nextElement();
configMap.put(key.toString(), config.get(key));
}
return configMap;
}
private HWXSchemaRegistry(Properties schemaRegistryConfig) {
_log.debug("Init SchemaRegistry Client");
this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
this.client = new SchemaRegistryClient(this.config);
this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
this.deserializer.init(this.config);
}
}
但我收到 404 HTTP 错误代码(未找到架构)。我认为这是由于 Nifi 配置和 HWX Schema Registry Client 实现之间不兼容 "protocols",因此客户端正在寻找的模式标识符字节在服务器上不存在,或者类似的东西。
有人可以帮忙吗?
谢谢。
Caused by: javax.ws.rs.NotFoundException: HTTP 404 Not Found
at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1069)
at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:866)
at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke(JerseyInvocation.java:750)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:748)
at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404)
at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.run(SchemaRegistryClient.java:1054)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.run(SchemaRegistryClient.java:1051)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getEntities(SchemaRegistryClient.java:1051)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:872)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:676)
at HWXSchemaRegistry.(HWXSchemaRegistry.java:56)
at HWXSchemaRegistry.getInstance(HWXSchemaRegistry.java:26)
at SchemaService.deserialize(SchemaService.java:70)
at SchemaService.deserialize(SchemaService.java:26)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
我找到了解决方法。因为我无法让这个工作。我使用字节数组的第一个字节对模式注册表进行多次调用,并让 avro 模式稍后反序列化字节数组的其余部分。
- 第一个字节 (0) 是协议版本(我发现这是一个 Nifi 特定的字节,因为我不需要它)。
- 接下来的 8 个字节是架构 ID
- 接下来的 4 个字节是模式版本
其余字节是消息本身:
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
try {
Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
Integer schemaVersion = ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();
SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
String schemaName = schemaInfo.getSchemaMetadata().getName();
SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
new SchemaVersionKey(schemaName, schemaVersion));
String avroSchema = schemaVersionInfo.getSchemaText();
byte[] message= Arrays.copyOfRange(message, 13, message.length);
// Deserialize [...]
}
catch (Exception e)
{
throw new IOException(e.getMessage());
}
}
我还想也许我必须在我的问题代码中调用 hwxSRInstance.deserializer.deserialize
之前删除第一个字节,因为这个字节似乎是 Nifi 处理器之间通信的 Nifi 特定字节,但它没有'没用。
下一步是使用模式文本构建缓存以避免多次调用模式注册表API。
新信息:我将扩展我的答案以包括 avro 反序列化部分,因为这对我来说是一些故障排除,我不得不检查 Nifi Avro Reader 源代码来弄清楚这部分(我正在尝试使用基本 avro 反序列化代码时出现无效 Avro 数据异常):
import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {
InputStream in = new SeekableByteArrayInput(message);
Schema schema = new Schema.Parser().parse(schemaText);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
GenericRecord genericRecord = null;
genericRecord = datumReader.read(genericRecord, decoder);
in.close();
return genericRecord;
}
如果要将GenericRecord转为map,注意String值不是Strings对象,需要转换String类型的Keys和values:
private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
Map<String, Object> map = new HashMap<>();
record.getSchema().getFields().forEach(field ->
map.put(String.valueOf(field.name()), record.get(field.name())));
// Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
if(map.get("value").getClass() == org.apache.avro.util.Utf8.class)
map.put("value", String.valueOf(map.get("value")));
return map;
}
我正在尝试使用 Hortonworks Schema Registry 反序列化一些由 Nifi 序列化的 Kafka 消息
- 在 Nifi 端用作 RecordWritter 的处理器:AvroRecordSetWriter
- 架构写入策略:HWX 内容编码架构参考
我能够在其他 Nifi kafka 消费者中反序列化这些消息。但是我正在尝试使用 Kafka 代码从我的 Flink 应用程序中反序列化它们。
我的 Flink 应用程序的 Kafka 解串器处理程序中有以下内容:
final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();
Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);
这里是反序列化消息的 HWXSchemaRegistryCode:
import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;
public class HWXSchemaRegistry {
private SchemaRegistryClient client;
private Map<String,Object> config;
private AvroSnapshotDeserializer deserializer;
private static HWXSchemaRegistry hwxSRInstance = null;
public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
if(hwxSRInstance == null)
hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
return hwxSRInstance;
}
public Object deserialize(byte[] message) throws IOException {
Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
return o;
}
private static Map<String,Object> properties2Map(Properties config) {
Enumeration<Object> keys = config.keys();
Map<String, Object> configMap = new HashMap<String,Object>();
while (keys.hasMoreElements()) {
Object key = (Object) keys.nextElement();
configMap.put(key.toString(), config.get(key));
}
return configMap;
}
private HWXSchemaRegistry(Properties schemaRegistryConfig) {
_log.debug("Init SchemaRegistry Client");
this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
this.client = new SchemaRegistryClient(this.config);
this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
this.deserializer.init(this.config);
}
}
但我收到 404 HTTP 错误代码(未找到架构)。我认为这是由于 Nifi 配置和 HWX Schema Registry Client 实现之间不兼容 "protocols",因此客户端正在寻找的模式标识符字节在服务器上不存在,或者类似的东西。
有人可以帮忙吗?
谢谢。
Caused by: javax.ws.rs.NotFoundException: HTTP 404 Not Found at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1069) at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:866) at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke(JerseyInvocation.java:750) at org.glassfish.jersey.internal.Errors.process(Errors.java:292) at org.glassfish.jersey.internal.Errors.process(Errors.java:274) at org.glassfish.jersey.internal.Errors.process(Errors.java:205) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390) at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:748) at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404) at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.run(SchemaRegistryClient.java:1054) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.run(SchemaRegistryClient.java:1051) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getEntities(SchemaRegistryClient.java:1051) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:872) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:676) at HWXSchemaRegistry.(HWXSchemaRegistry.java:56) at HWXSchemaRegistry.getInstance(HWXSchemaRegistry.java:26) at SchemaService.deserialize(SchemaService.java:70) at SchemaService.deserialize(SchemaService.java:26) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
我找到了解决方法。因为我无法让这个工作。我使用字节数组的第一个字节对模式注册表进行多次调用,并让 avro 模式稍后反序列化字节数组的其余部分。
- 第一个字节 (0) 是协议版本(我发现这是一个 Nifi 特定的字节,因为我不需要它)。
- 接下来的 8 个字节是架构 ID
- 接下来的 4 个字节是模式版本
其余字节是消息本身:
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; import com.hortonworks.registries.schemaregistry.SchemaVersionKey; import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) { try { Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong(); Integer schemaVersion = ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt(); SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId); String schemaName = schemaInfo.getSchemaMetadata().getName(); SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo( new SchemaVersionKey(schemaName, schemaVersion)); String avroSchema = schemaVersionInfo.getSchemaText(); byte[] message= Arrays.copyOfRange(message, 13, message.length); // Deserialize [...] } catch (Exception e) { throw new IOException(e.getMessage()); } }
我还想也许我必须在我的问题代码中调用 hwxSRInstance.deserializer.deserialize
之前删除第一个字节,因为这个字节似乎是 Nifi 处理器之间通信的 Nifi 特定字节,但它没有'没用。
下一步是使用模式文本构建缓存以避免多次调用模式注册表API。
新信息:我将扩展我的答案以包括 avro 反序列化部分,因为这对我来说是一些故障排除,我不得不检查 Nifi Avro Reader 源代码来弄清楚这部分(我正在尝试使用基本 avro 反序列化代码时出现无效 Avro 数据异常):
import org.apache.avro.Schema;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {
InputStream in = new SeekableByteArrayInput(message);
Schema schema = new Schema.Parser().parse(schemaText);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
GenericRecord genericRecord = null;
genericRecord = datumReader.read(genericRecord, decoder);
in.close();
return genericRecord;
}
如果要将GenericRecord转为map,注意String值不是Strings对象,需要转换String类型的Keys和values:
private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
{
Map<String, Object> map = new HashMap<>();
record.getSchema().getFields().forEach(field ->
map.put(String.valueOf(field.name()), record.get(field.name())));
// Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
if(map.get("value").getClass() == org.apache.avro.util.Utf8.class)
map.put("value", String.valueOf(map.get("value")));
return map;
}