Kafka Stream with Avro in JAVA , schema.registry.url" 没有默认值
Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value
我的 Kafka Stream 应用程序有以下配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
我收到以下错误:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
我已经尝试替换行
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
与
config.put("schema.registry.url","http://localhost:8081");
但同样的错误
我在准备 Stream 应用程序时遵循了 this url 的说明。
有什么建议吗?
如果您有 Avro 格式的键和值,以下几行应该可以解决问题,
config.put("key.converter.schema.registry.url", "http://localhost:8081");
config.put("value.converter.schema.registry.url", "http://localhost:8081");
如果这似乎不起作用,您可以明确覆盖 Serdes
。例如,如果您有 Avro 密钥:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
如果您使用的是 application.yml
,您可以将 属性 设置为:
spring:
kafka:
properties:
schema.registry.url: your-schema-registy-url
consumer:
auto-offset-reset: latest
group-id: simple-consumer
的教程中找到了它
显然答案已经过时了。
它使用此代码片段对我有用(此处的示例只是将一个主题传递到另一个主题而没有任何更改)。
final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic",Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde))
.peek((k, v) -> log.info("Processed a new record")).to(outputTopic,Produced.with(keyGenericAvroSerde, valueGenericAvroSerde));
我的 Kafka Stream 应用程序有以下配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
我收到以下错误:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
我已经尝试替换行
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
与
config.put("schema.registry.url","http://localhost:8081");
但同样的错误
我在准备 Stream 应用程序时遵循了 this url 的说明。
有什么建议吗?
如果您有 Avro 格式的键和值,以下几行应该可以解决问题,
config.put("key.converter.schema.registry.url", "http://localhost:8081");
config.put("value.converter.schema.registry.url", "http://localhost:8081");
如果这似乎不起作用,您可以明确覆盖 Serdes
。例如,如果您有 Avro 密钥:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
如果您使用的是 application.yml
,您可以将 属性 设置为:
spring:
kafka:
properties:
schema.registry.url: your-schema-registy-url
consumer:
auto-offset-reset: latest
group-id: simple-consumer
的教程中找到了它
显然答案已经过时了。
它使用此代码片段对我有用(此处的示例只是将一个主题传递到另一个主题而没有任何更改)。
final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
builder.stream("my-avro-topic",Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde))
.peek((k, v) -> log.info("Processed a new record")).to(outputTopic,Produced.with(keyGenericAvroSerde, valueGenericAvroSerde));