为什么 apache-hudi 正在创建 COPY_ON_WRITE table,即使我已经给出了 MERGE_ON_READ?
Why apache-hudi is creating COPY_ON_WRITE table even if I have given MERGE_ON_READ?
我正在尝试用 MERGE_ON_READ table 类型创建一个简单的 hudi table。
执行仍在 hoodie.properties 文件中的代码后,我看到 hoodie.table.type=COPY_ON_WRITE
我是不是遗漏了什么?
此代码的 Jupyter 笔记本: https://github.com/sannidhiteredesai/spark/blob/master/hudi_acct.ipynb
hudi_options = {
"hoodie.table.name": "hudi_acct",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "acctid",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 8,
"hoodie.insert.shuffle.parallelism": 8,
}
input_df = spark.createDataFrame(
[
(100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 10),
(101, "2015-01-01", "2015-01-01T12:14:58.597216Z", 10),
(102, "2015-01-01", "2015-01-01T13:51:40.417052Z", 10),
(103, "2015-01-01", "2015-01-01T13:51:40.519832Z", 10),
(104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
(104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
(104, "2015-01-02", "2015-01-02T12:15:00.512679Z", 20),
(105, "2015-01-02", "2015-01-01T13:51:42.248818Z", 10),
],
("acctid", "date", "ts", "deposit"),
)
# INSERT
(
input_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save(hudi_dataset)
)
update_df = spark.createDataFrame(
[(100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 20)],
("acctid", "date", "ts", "deposit"))
# UPDATE
(
update_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save(hudi_dataset)
)
编辑: 执行上述代码后,我看到在 date=2015-01-01 分区中创建了 2 个 parquet 文件。在读取第二个 parquet 文件时,我原本希望只获得更新的 1 条记录,但我也可以看到该分区中的所有其他记录。
使用insert加载数据到hudi时请先试一下mode("overwrite")看看能不能用?
问题出在 "hoodie.table.type": "MERGE_ON_READ",
配置上。您必须改用 hoodie.datasource.write.table.type
。如果您按如下方式更新配置,它将起作用。我已经测试了。
hudi_options = {
"hoodie.table.name": "hudi_acct",
"hoodie.datasource.write.table.type": "MERGE_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "acctid",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 8,
"hoodie.insert.shuffle.parallelism": 8,
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": 10
}
我正在尝试用 MERGE_ON_READ table 类型创建一个简单的 hudi table。 执行仍在 hoodie.properties 文件中的代码后,我看到 hoodie.table.type=COPY_ON_WRITE
我是不是遗漏了什么?
此代码的 Jupyter 笔记本: https://github.com/sannidhiteredesai/spark/blob/master/hudi_acct.ipynb
hudi_options = {
"hoodie.table.name": "hudi_acct",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "acctid",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 8,
"hoodie.insert.shuffle.parallelism": 8,
}
input_df = spark.createDataFrame(
[
(100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 10),
(101, "2015-01-01", "2015-01-01T12:14:58.597216Z", 10),
(102, "2015-01-01", "2015-01-01T13:51:40.417052Z", 10),
(103, "2015-01-01", "2015-01-01T13:51:40.519832Z", 10),
(104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
(104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
(104, "2015-01-02", "2015-01-02T12:15:00.512679Z", 20),
(105, "2015-01-02", "2015-01-01T13:51:42.248818Z", 10),
],
("acctid", "date", "ts", "deposit"),
)
# INSERT
(
input_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save(hudi_dataset)
)
update_df = spark.createDataFrame(
[(100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 20)],
("acctid", "date", "ts", "deposit"))
# UPDATE
(
update_df.write.format("org.apache.hudi")
.options(**hudi_options)
.mode("append")
.save(hudi_dataset)
)
编辑: 执行上述代码后,我看到在 date=2015-01-01 分区中创建了 2 个 parquet 文件。在读取第二个 parquet 文件时,我原本希望只获得更新的 1 条记录,但我也可以看到该分区中的所有其他记录。
使用insert加载数据到hudi时请先试一下mode("overwrite")看看能不能用?
问题出在 "hoodie.table.type": "MERGE_ON_READ",
配置上。您必须改用 hoodie.datasource.write.table.type
。如果您按如下方式更新配置,它将起作用。我已经测试了。
hudi_options = {
"hoodie.table.name": "hudi_acct",
"hoodie.datasource.write.table.type": "MERGE_ON_WRITE",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.recordkey.field": "acctid",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "date",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.upsert.shuffle.parallelism": 8,
"hoodie.insert.shuffle.parallelism": 8,
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": 10
}