Confluent JDBC 连接器和 Flink 消费者
Confluent JDBC connector And Flink consumer
我们正在尝试将 SQL-Server JDBC 连接器与 KafkaAvroSerializer 一起使用,并提供自定义 ProducerInterceptor 以在将数据发送到 Kafka 之前对其进行加密。
在消费者端,我们希望使用 Flink 连接器进行解密,然后使用适当的反序列化器。
为了实现这一点,我们有几个问题:
1)如果我们提供自定义的ConsumerInterceptor来解密数据,那么在Flink中创建DataStream时是否应该通过Properties文件传入?
Properties properties = new Properties();
...
`properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
...
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));
以上配置是否正确,或者我是否需要设置任何其他 属性 以便我可以将 ConsumerInterceptor 传递给 Flink?
2)另一个问题是关于Flink中的Deserializer。例如,我在网上查找了它,发现了一些代码片段,如下所示:
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
因此,如果我们使用 JDBC 连接器将数据传递到 Kafka 而不进行任何修改(除了加密数据),那么在反序列化期间我们应该提供什么数据类型?我们将在反序列化之前解密数据。
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
提前致谢,
只需添加最终结果,以便它可以帮助任何正在寻找相同结果的人:
public class ConfluentAvroDeserializationSchema
implements DeserializationSchema<GenericRecord> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private transient KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
identityMapCapacity) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.identityMapCapacity = identityMapCapacity;
}
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
if (kafkaAvroDecoder == null) {
SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
}
@Override
public boolean isEndOfStream(GenericRecord string) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
}
我们正在尝试将 SQL-Server JDBC 连接器与 KafkaAvroSerializer 一起使用,并提供自定义 ProducerInterceptor 以在将数据发送到 Kafka 之前对其进行加密。
在消费者端,我们希望使用 Flink 连接器进行解密,然后使用适当的反序列化器。
为了实现这一点,我们有几个问题:
1)如果我们提供自定义的ConsumerInterceptor来解密数据,那么在Flink中创建DataStream时是否应该通过Properties文件传入?
Properties properties = new Properties();
...
`properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
...
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));
以上配置是否正确,或者我是否需要设置任何其他 属性 以便我可以将 ConsumerInterceptor 传递给 Flink?
2)另一个问题是关于Flink中的Deserializer。例如,我在网上查找了它,发现了一些代码片段,如下所示:
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
因此,如果我们使用 JDBC 连接器将数据传递到 Kafka 而不进行任何修改(除了加密数据),那么在反序列化期间我们应该提供什么数据类型?我们将在反序列化之前解密数据。
public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> {
提前致谢,
只需添加最终结果,以便它可以帮助任何正在寻找相同结果的人:
public class ConfluentAvroDeserializationSchema
implements DeserializationSchema<GenericRecord> {
private final String schemaRegistryUrl;
private final int identityMapCapacity;
private transient KafkaAvroDecoder kafkaAvroDecoder;
public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
this(schemaRegistyUrl, 1000);
}
public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
identityMapCapacity) {
this.schemaRegistryUrl = schemaRegistryUrl;
this.identityMapCapacity = identityMapCapacity;
}
@Override
public GenericRecord deserialize(byte[] bytes) throws IOException {
if (kafkaAvroDecoder == null) {
SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
}
return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
}
@Override
public boolean isEndOfStream(GenericRecord string) {
return false;
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeExtractor.getForClass(GenericRecord.class);
}
}