Iceberg 的 FlinkSink 不会更新流式写入中的元数据文件
Iceberg's FlinkSink doesn't update metadata file in streaming writes
我一直在尝试使用Iceberg的FlinkSink来消费数据并写入sink。
我成功地从 kinesis 中获取了数据,并且我看到数据正在被写入适当的分区。但是,我没有看到 metadata.json
正在更新。没有它我无法查询 table.
感谢任何帮助或指点。
代码如下
package test
import java.util.{Calendar, Properties}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.data.{GenericRowData, RowData, StringData}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.flink.TableLoader.HadoopTableLoader
import org.apache.iceberg.flink.sink.FlinkSink
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
import org.apache.iceberg.{PartitionSpec, Schema}
import scala.collection.JavaConverters._
object SampleApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val warehouse = "file://<local folder path>"
val catalog = new HadoopCatalog(new Configuration(), warehouse)
val ti = TableIdentifier.of("test_table")
if (!catalog.tableExists(ti)) {
println("table doesnt exist. creating it.")
val schema = new Schema(
Types.NestedField.optional(1, "message", Types.StringType.get()),
Types.NestedField.optional(2, "year", Types.StringType.get()),
Types.NestedField.optional(3, "month", Types.StringType.get()),
Types.NestedField.optional(4, "date", Types.StringType.get()),
Types.NestedField.optional(5, "hour", Types.StringType.get())
)
val props = Map(
"write.metadata.delete-after-commit.enabled" -> "true",
"write.metadata.previous-versions-max" -> "5",
"write.target-file-size-bytes" -> "1048576"
)
val partitionSpec = PartitionSpec.builderFor(schema)
.identity("year")
.identity("month")
.identity("date")
.identity("hour")
.build();
catalog.createTable(ti, schema, partitionSpec, props.asJava)
} else {
println("table exists. not creating it.")
}
val inputProperties = new Properties()
inputProperties.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val stream: DataStream[RowData] = env
.addSource(new FlinkKinesisConsumer[String]("test_kinesis_stream", new SimpleStringSchema(), inputProperties))
.map(x => {
val now = Calendar.getInstance()
GenericRowData.of(
StringData.fromString(x),
StringData.fromString(now.get(Calendar.YEAR).toString),
StringData.fromString("%02d".format(now.get(Calendar.MONTH))),
StringData.fromString("%02d".format(now.get(Calendar.DAY_OF_MONTH))),
StringData.fromString("%02d".format(now.get(Calendar.HOUR_OF_DAY)))
)
})
FlinkSink
.forRowData(stream.javaStream)
.tableLoader(TableLoader.fromHadoopTable(s"$warehouse/${ti.name()}", new Configuration()))
.build()
env.execute("test app")
}
}
提前致谢。
你应该设置检查点:
env.enableCheckpointing(1000)
我一直在尝试使用Iceberg的FlinkSink来消费数据并写入sink。
我成功地从 kinesis 中获取了数据,并且我看到数据正在被写入适当的分区。但是,我没有看到 metadata.json
正在更新。没有它我无法查询 table.
感谢任何帮助或指点。
代码如下
package test
import java.util.{Calendar, Properties}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
import org.apache.flink.table.data.{GenericRowData, RowData, StringData}
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.flink.TableLoader.HadoopTableLoader
import org.apache.iceberg.flink.sink.FlinkSink
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
import org.apache.iceberg.{PartitionSpec, Schema}
import scala.collection.JavaConverters._
object SampleApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val warehouse = "file://<local folder path>"
val catalog = new HadoopCatalog(new Configuration(), warehouse)
val ti = TableIdentifier.of("test_table")
if (!catalog.tableExists(ti)) {
println("table doesnt exist. creating it.")
val schema = new Schema(
Types.NestedField.optional(1, "message", Types.StringType.get()),
Types.NestedField.optional(2, "year", Types.StringType.get()),
Types.NestedField.optional(3, "month", Types.StringType.get()),
Types.NestedField.optional(4, "date", Types.StringType.get()),
Types.NestedField.optional(5, "hour", Types.StringType.get())
)
val props = Map(
"write.metadata.delete-after-commit.enabled" -> "true",
"write.metadata.previous-versions-max" -> "5",
"write.target-file-size-bytes" -> "1048576"
)
val partitionSpec = PartitionSpec.builderFor(schema)
.identity("year")
.identity("month")
.identity("date")
.identity("hour")
.build();
catalog.createTable(ti, schema, partitionSpec, props.asJava)
} else {
println("table exists. not creating it.")
}
val inputProperties = new Properties()
inputProperties.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1")
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val stream: DataStream[RowData] = env
.addSource(new FlinkKinesisConsumer[String]("test_kinesis_stream", new SimpleStringSchema(), inputProperties))
.map(x => {
val now = Calendar.getInstance()
GenericRowData.of(
StringData.fromString(x),
StringData.fromString(now.get(Calendar.YEAR).toString),
StringData.fromString("%02d".format(now.get(Calendar.MONTH))),
StringData.fromString("%02d".format(now.get(Calendar.DAY_OF_MONTH))),
StringData.fromString("%02d".format(now.get(Calendar.HOUR_OF_DAY)))
)
})
FlinkSink
.forRowData(stream.javaStream)
.tableLoader(TableLoader.fromHadoopTable(s"$warehouse/${ti.name()}", new Configuration()))
.build()
env.execute("test app")
}
}
提前致谢。
你应该设置检查点:
env.enableCheckpointing(1000)