如何让 Vertica-Kafka 调度程序与 Avro 模式注册表一起工作?
How to get Vertica-Kafka scheduler to work with Avro schema registry?
如何设置 Vertica 调度程序以使用来自 Kafka 的 Avro 数据
您好,我正在尝试使用 Vertica Scheduler 从 Kafka 主题中使用 Avro 数据。
我写了一个小的 java 代码来生成一个简单的对象作为 Avro 数据到 Kafka 中。
我还将模式推送到 kafka-schema-registry 中,我可以从浏览器中看到它。
我正在尝试设置 Vertica 的调度程序,所有命令都已无误地传递。
我还启动了一个微批处理,但是 table 是空的,我的调度程序不使用来自 Kafka 主题的数据
首先我配置了一个名为 schduler.properties:
的 conf 文件
config-schema=person100_sched
username=dbadmin
dbhost=10.50.50.16
dbport=5433
比在 Vertica DB 上我创建了一个资源池
CREATE RESOURCE POOL person100_pool MEMORYSIZE '10%' PLANNEDCONCURRENCY 1 QUEUETIMEOUT 0;
我的下一步是创建一个调度程序:
sudo ./vkconfig scheduler --create --operator dbadmin --frame-duration '00:00:02' --resource-pool person100_pool --conf scheduler.properties
我的下一步是创建集群
sudo ./vkconfig cluster --create --cluster person100_cluster --hosts kafka:9092 --conf scheduler.properties
我的下一步是创建源:
sudo ./vkconfig source --create --cluster person100_cluster --source person-avro --partitions 1 --conf scheduler.properties
我的下一步是创建一个目标 table:
sudo ./vkconfig target --create --target-schema public --target-table person100 --conf scheduler.properties
- 然后我在 Vertica 中创建了一个 table,其类型与 Person 模式相同
我的下一步是创建一个 AvroParser
sudo ./vkconfig load-spec --create --parser KafkaAvroParser --load-spec person_load --conf scheduler.properties
我的下一步是创建微批次
sudo ./vkconfig microbatch --create --microbatch personBatch --target-schema public --target-table person100 --load-spec person_load --add-source person-avro --add-source-cluster person100_cluster --conf scheduler.properties
我的下一步是启动微批处理
sudo ./vkconfig launch --conf scheduler.properties
看起来一切顺利,但是table是空的
创建 load-spec
时,您需要指定 Vertica 将用于解析 Avro 消息的架构注册表选项。
--parser-parameters "schema_registry_url='schema-registry:8081',schema_registry_subject='somesubject',schema_registry_version='1',flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE"
而不是 schema-registry:8081
和 somesubject
使用与您在浏览器中连接到 schema-registry 时相同的信息。
注意: 如果 table person100
不是 flex table,那么您需要包括 flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE
除了架构注册表信息。
创建 load-spec 的完整命令应如下所示。
sudo ./vkconfig load-spec --create --parser KafkaAvroParser --load-spec person_load --parser-parameters "schema_registry_url='schema-registry:8081',schema_registry_subject='somesubject',schema_registry_version='1',flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE" --conf scheduler.properties
如何设置 Vertica 调度程序以使用来自 Kafka 的 Avro 数据
您好,我正在尝试使用 Vertica Scheduler 从 Kafka 主题中使用 Avro 数据。 我写了一个小的 java 代码来生成一个简单的对象作为 Avro 数据到 Kafka 中。 我还将模式推送到 kafka-schema-registry 中,我可以从浏览器中看到它。 我正在尝试设置 Vertica 的调度程序,所有命令都已无误地传递。 我还启动了一个微批处理,但是 table 是空的,我的调度程序不使用来自 Kafka 主题的数据 首先我配置了一个名为 schduler.properties:
的 conf 文件config-schema=person100_sched
username=dbadmin
dbhost=10.50.50.16
dbport=5433
比在 Vertica DB 上我创建了一个资源池
CREATE RESOURCE POOL person100_pool MEMORYSIZE '10%' PLANNEDCONCURRENCY 1 QUEUETIMEOUT 0;
我的下一步是创建一个调度程序:
sudo ./vkconfig scheduler --create --operator dbadmin --frame-duration '00:00:02' --resource-pool person100_pool --conf scheduler.properties
我的下一步是创建集群
sudo ./vkconfig cluster --create --cluster person100_cluster --hosts kafka:9092 --conf scheduler.properties
我的下一步是创建源:
sudo ./vkconfig source --create --cluster person100_cluster --source person-avro --partitions 1 --conf scheduler.properties
我的下一步是创建一个目标 table:
sudo ./vkconfig target --create --target-schema public --target-table person100 --conf scheduler.properties
- 然后我在 Vertica 中创建了一个 table,其类型与 Person 模式相同
我的下一步是创建一个 AvroParser
sudo ./vkconfig load-spec --create --parser KafkaAvroParser --load-spec person_load --conf scheduler.properties
我的下一步是创建微批次
sudo ./vkconfig microbatch --create --microbatch personBatch --target-schema public --target-table person100 --load-spec person_load --add-source person-avro --add-source-cluster person100_cluster --conf scheduler.properties
我的下一步是启动微批处理
sudo ./vkconfig launch --conf scheduler.properties
看起来一切顺利,但是table是空的
创建 load-spec
时,您需要指定 Vertica 将用于解析 Avro 消息的架构注册表选项。
--parser-parameters "schema_registry_url='schema-registry:8081',schema_registry_subject='somesubject',schema_registry_version='1',flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE"
而不是 schema-registry:8081
和 somesubject
使用与您在浏览器中连接到 schema-registry 时相同的信息。
注意: 如果 table person100
不是 flex table,那么您需要包括 flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE
除了架构注册表信息。
创建 load-spec 的完整命令应如下所示。
sudo ./vkconfig load-spec --create --parser KafkaAvroParser --load-spec person_load --parser-parameters "schema_registry_url='schema-registry:8081',schema_registry_subject='somesubject',schema_registry_version='1',flatten_arrays=TRUE,flatten_maps=TRUE,flatten_records=TRUE" --conf scheduler.properties