对新列使用不同的 avro 模式
using different avro schema for new columns
我正在使用flume + kafka 将日志数据下沉到hdfs。我的接收器数据类型是 Avro。在 avro 模式 (.avsc) 中,有 80 个字段作为列。
所以我创建了一个外部 table 这样的
CREATE external TABLE pgar.tiz_biaws_fraud
PARTITIONED BY(partition_date INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/data/datapool/flume/biaws/fraud'
TBLPROPERTIES ('avro.schema.url'='hdfs://xxxx-ns/data/datapool/flume/biaws/fraud.avsc')
现在,我需要向 avro 模式添加 25 个以上的列。在那种情况下,
如果我使用具有 105 列的新模式创建一个新的 table,我将有两个 table 用于一个项目。如果我在未来几天添加或删除一些列,我必须为此创建一个新的 table。我害怕有很多 table 对同一个项目使用不同的模式。
如果我将当前 table 中的旧模式与新模式交换,我将只有一个 table 用于一个项目,但由于模式冲突,我无法再读取和获取旧数据.
在这种情况下使用 avro 模式的最佳方法是什么?
这确实很有挑战性。最好的方法是确保您所做的所有模式更改都与旧数据兼容 - 因此只删除具有默认值的列,并确保在您添加的列中提供默认值。这样您就可以安全地交换模式而不会发生冲突并继续读取旧数据。 Avro 在这方面非常聪明,它被称为 "schema evolution"(以防你想要 google 多一点)并允许 reader 和 writer 模式有点不同。
顺便说一句,我想提一下,Kafka 有一个原生的 HDFS 连接器(即没有 Flume),它使用 Confluent 的模式注册表来自动处理这些类型的模式更改——您可以使用注册表来检查如果模式兼容,并且如果它们兼容 - 只需使用新模式写入数据,Hive table 将自动进化以匹配。
我像那样向 avro 架构添加了新列
{"name":"newColumn1", "type": "string", "default": ""},
{"name":"newColumn2", "type": "string", "default": ""},
{"name":"newColumn3", "type": "string", "default": ""},
当我使用 default
属性 时,如果当前数据中不存在该列,则为 returns 默认值,如果当前数据中存在该列,则为 [=24] =] 数据值符合预期。
要将空值设置为默认值,您需要
{ "name": "newColumn4", "type": [ "string", "null" ], "default": "null" },
或
{ "name": "newColumn5", "type": [ "null", "string" ]},
null在类型属性中的位置,可以是第一位,也可以是第二位,默认为属性。
我正在使用flume + kafka 将日志数据下沉到hdfs。我的接收器数据类型是 Avro。在 avro 模式 (.avsc) 中,有 80 个字段作为列。
所以我创建了一个外部 table 这样的
CREATE external TABLE pgar.tiz_biaws_fraud
PARTITIONED BY(partition_date INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/data/datapool/flume/biaws/fraud'
TBLPROPERTIES ('avro.schema.url'='hdfs://xxxx-ns/data/datapool/flume/biaws/fraud.avsc')
现在,我需要向 avro 模式添加 25 个以上的列。在那种情况下,
如果我使用具有 105 列的新模式创建一个新的 table,我将有两个 table 用于一个项目。如果我在未来几天添加或删除一些列,我必须为此创建一个新的 table。我害怕有很多 table 对同一个项目使用不同的模式。
如果我将当前 table 中的旧模式与新模式交换,我将只有一个 table 用于一个项目,但由于模式冲突,我无法再读取和获取旧数据.
在这种情况下使用 avro 模式的最佳方法是什么?
这确实很有挑战性。最好的方法是确保您所做的所有模式更改都与旧数据兼容 - 因此只删除具有默认值的列,并确保在您添加的列中提供默认值。这样您就可以安全地交换模式而不会发生冲突并继续读取旧数据。 Avro 在这方面非常聪明,它被称为 "schema evolution"(以防你想要 google 多一点)并允许 reader 和 writer 模式有点不同。
顺便说一句,我想提一下,Kafka 有一个原生的 HDFS 连接器(即没有 Flume),它使用 Confluent 的模式注册表来自动处理这些类型的模式更改——您可以使用注册表来检查如果模式兼容,并且如果它们兼容 - 只需使用新模式写入数据,Hive table 将自动进化以匹配。
我像那样向 avro 架构添加了新列
{"name":"newColumn1", "type": "string", "default": ""},
{"name":"newColumn2", "type": "string", "default": ""},
{"name":"newColumn3", "type": "string", "default": ""},
当我使用 default
属性 时,如果当前数据中不存在该列,则为 returns 默认值,如果当前数据中存在该列,则为 [=24] =] 数据值符合预期。
要将空值设置为默认值,您需要
{ "name": "newColumn4", "type": [ "string", "null" ], "default": "null" },
或
{ "name": "newColumn5", "type": [ "null", "string" ]},
null在类型属性中的位置,可以是第一位,也可以是第二位,默认为属性。