我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件
I want to write ORC file using Flink's Streaming File Sink but it doesn’t write files correctly
我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统。我使用了他们官方网站上的以下 link 参考资料。但是我可以看到 Flink 为所有数据写入了完全相同的内容并制作了这么多文件并且所有文件都正常 103KB
请在下面找到我的代码。
object BeaconBatchIngest extends StreamingBase {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
def getTopicConfig(configs: List[Config]): Map[String, String] = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap
def setKafkaConfig():Unit ={
val kafkaParams = new Properties()
kafkaParams.setProperty("bootstrap.servers","")
kafkaParams.setProperty("zookeeper.connect","")
kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
kafkaParams.setProperty("auto.offset.reset", "latest")
val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
kafka_consumer.setStartFromLatest()
val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
val writerProperties = new Properties()
writerProperties.setProperty("orc.compress", "ZLIB")
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
val sink: StreamingFileSink[DataParse] = StreamingFileSink
.forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
.build()
stream.addSink(sink)
}
def main(args: Array[String]): Unit = {
setKafkaConfig()
env.enableCheckpointing(5000)
env.execute("Kafka_Flink_HIVE")
}
}
class temp extends MapFunction[String,DataParse]{
override def map(record: String): DataParse = {
new DataParse(record)
}
}
class DataParse(data : String){
val parsedJason = parse(data)
val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {
override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
timeColVector.vector(batch.size + 1) = element.timestamp
eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
}
}
对于批量格式(例如 ORC),StreamingFileSink
会在每个检查点滚动到新文件。如果减少检查点间隔(目前为 5 秒),它不会写入那么多文件。
我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统。我使用了他们官方网站上的以下 link 参考资料。但是我可以看到 Flink 为所有数据写入了完全相同的内容并制作了这么多文件并且所有文件都正常 103KB
请在下面找到我的代码。
object BeaconBatchIngest extends StreamingBase {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
def getTopicConfig(configs: List[Config]): Map[String, String] = (for (config: Config <- configs) yield (config.getString("sourceTopic"), config.getString("destinationTopic"))).toMap
def setKafkaConfig():Unit ={
val kafkaParams = new Properties()
kafkaParams.setProperty("bootstrap.servers","")
kafkaParams.setProperty("zookeeper.connect","")
kafkaParams.setProperty("group.id", DEFAULT_KAFKA_GROUP_ID)
kafkaParams.setProperty("auto.offset.reset", "latest")
val kafka_consumer:FlinkKafkaConsumer[String] = new FlinkKafkaConsumer[String]("sourceTopics", new SimpleStringSchema(),kafkaParams)
kafka_consumer.setStartFromLatest()
val stream: DataStream[DataParse] = env.addSource(kafka_consumer).map(new temp)
val schema: String = "struct<_col0:string,_col1:bigint,_col2:string,_col3:string,_col4:string>"
val writerProperties = new Properties()
writerProperties.setProperty("orc.compress", "ZLIB")
val writerFactory = new OrcBulkWriterFactory(new PersonVectorizer(schema),writerProperties,new org.apache.hadoop.conf.Configuration);
val sink: StreamingFileSink[DataParse] = StreamingFileSink
.forBulkFormat(new Path("hdfs://warehousestore/hive/warehouse/metrics_test.db/upp_raw_prod/hour=1/"), writerFactory)
.build()
stream.addSink(sink)
}
def main(args: Array[String]): Unit = {
setKafkaConfig()
env.enableCheckpointing(5000)
env.execute("Kafka_Flink_HIVE")
}
}
class temp extends MapFunction[String,DataParse]{
override def map(record: String): DataParse = {
new DataParse(record)
}
}
class DataParse(data : String){
val parsedJason = parse(data)
val timestamp = compact(render(parsedJason \ "timestamp")).replaceAll("\"", "").toLong
val event = compact(render(parsedJason \ "event")).replaceAll("\"", "")
val source_id = compact(render(parsedJason \ "source_id")).replaceAll("\"", "")
val app = compact(render(parsedJason \ "app")).replaceAll("\"", "")
val json = data
}
class PersonVectorizer(schema: String) extends Vectorizer[DataParse](schema) {
override def vectorize(element: DataParse, batch: VectorizedRowBatch): Unit = {
val eventColVector = batch.cols(0).asInstanceOf[BytesColumnVector]
val timeColVector = batch.cols(1).asInstanceOf[LongColumnVector]
val sourceIdColVector = batch.cols(2).asInstanceOf[BytesColumnVector]
val appColVector = batch.cols(3).asInstanceOf[BytesColumnVector]
val jsonColVector = batch.cols(4).asInstanceOf[BytesColumnVector]
timeColVector.vector(batch.size + 1) = element.timestamp
eventColVector.setVal(batch.size + 1, element.event.getBytes(StandardCharsets.UTF_8))
sourceIdColVector.setVal(batch.size + 1, element.source_id.getBytes(StandardCharsets.UTF_8))
appColVector.setVal(batch.size + 1, element.app.getBytes(StandardCharsets.UTF_8))
jsonColVector.setVal(batch.size + 1, element.json.getBytes(StandardCharsets.UTF_8))
}
}
对于批量格式(例如 ORC),StreamingFileSink
会在每个检查点滚动到新文件。如果减少检查点间隔(目前为 5 秒),它不会写入那么多文件。