从 avro 模式创建配置单元 table 时出错
Error creating hive table from avro schema
我正在尝试通过从存储在 s3 中的 Avro 数据中提取架构来创建配置单元 table。数据使用 s3 Kafka 连接器存储在 s3 中。我正在向生产者发布一个简单的 POJO。
从 Avro 数据中提取模式的代码:-
for filename in os.listdir(temp_folder_path):
filename = temp_folder_path + filename
if filename.endswith('avro'):
os.system(
'java -jar /path/to/avro-jar/avro-tools-1.8.2.jar getschema {0} > {1}'.format(
filename, filename.replace('avro', 'avsc')))
然后将提取的架构保存在 s3 存储桶中。
创建table查询:-
CREATE EXTERNAL TABLE IF NOT EXISTS `db_name_service.table_name_change_log` PARTITIONED BY (`year` bigint,
`month` bigint, `day` bigint, `hour` bigint) ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 's3://bucket/topics/topic_name'
TBLPROPERTIES ( 'avro.schema.url'='s3://bucket/schemas/topic_name.avsc')
错误:-
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: MetaException(message:org.apache.hadoop.hive.serde2.avro.AvroSerdeException Schema for table must be of type RECORD. Received type: BYTES)
架构:-
{ "type": "record", "name": "Employee", "doc" : "Represents an Employee at a company", "fields": [ {"name":
"firstName", "type": "string", "doc": "The persons given name"}, {"name": "nickName", "type": ["null",
"string"], "default" : null}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int",
"default": -1}, {"name": "phoneNumber", "type": "string"} ] }
我可以使用此命令查看主题中的数据./confluent-4.1.1/bin/kafka-avro-console-consumer --topic test2_singular --bootstrap-server localhost:9092 --from-beginning
{"firstName":"A:0","nickName":{"string":"C"},"lastName":"C","age":0,"phoneNumber":"123"}
{"firstName":"A:1","nickName":{"string":"C"},"lastName":"C","age":1,"phoneNumber":"123"}
Schema for table must be of type RECORD. Received type: BYTES
如果您没有将 AvroConverter 用于 Connect 接收器配置,则可能发生这种情况的唯一方法。
您还需要从 S3 文件中提取架构。
提示:使用 Lambda 函数监视存储桶中的 avro 文件创建可以帮助获取模式而无需扫描整个存储桶或随机文件 并且 用于通知 Hive/AWS 胶水 table 模式更新
我正在尝试通过从存储在 s3 中的 Avro 数据中提取架构来创建配置单元 table。数据使用 s3 Kafka 连接器存储在 s3 中。我正在向生产者发布一个简单的 POJO。
从 Avro 数据中提取模式的代码:-
for filename in os.listdir(temp_folder_path):
filename = temp_folder_path + filename
if filename.endswith('avro'):
os.system(
'java -jar /path/to/avro-jar/avro-tools-1.8.2.jar getschema {0} > {1}'.format(
filename, filename.replace('avro', 'avsc')))
然后将提取的架构保存在 s3 存储桶中。
创建table查询:-
CREATE EXTERNAL TABLE IF NOT EXISTS `db_name_service.table_name_change_log` PARTITIONED BY (`year` bigint,
`month` bigint, `day` bigint, `hour` bigint) ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 's3://bucket/topics/topic_name'
TBLPROPERTIES ( 'avro.schema.url'='s3://bucket/schemas/topic_name.avsc')
错误:-
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: MetaException(message:org.apache.hadoop.hive.serde2.avro.AvroSerdeException Schema for table must be of type RECORD. Received type: BYTES)
架构:-
{ "type": "record", "name": "Employee", "doc" : "Represents an Employee at a company", "fields": [ {"name":
"firstName", "type": "string", "doc": "The persons given name"}, {"name": "nickName", "type": ["null",
"string"], "default" : null}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int",
"default": -1}, {"name": "phoneNumber", "type": "string"} ] }
我可以使用此命令查看主题中的数据./confluent-4.1.1/bin/kafka-avro-console-consumer --topic test2_singular --bootstrap-server localhost:9092 --from-beginning
{"firstName":"A:0","nickName":{"string":"C"},"lastName":"C","age":0,"phoneNumber":"123"}
{"firstName":"A:1","nickName":{"string":"C"},"lastName":"C","age":1,"phoneNumber":"123"}
Schema for table must be of type RECORD. Received type: BYTES
如果您没有将 AvroConverter 用于 Connect 接收器配置,则可能发生这种情况的唯一方法。
您还需要从 S3 文件中提取架构。
提示:使用 Lambda 函数监视存储桶中的 avro 文件创建可以帮助获取模式而无需扫描整个存储桶或随机文件 并且 用于通知 Hive/AWS 胶水 table 模式更新