Flink 流应用程序产生许多进行中的空输出文件
Flink streaming app produces many inprogress empty output files
我有以下流式应用程序,它从 Kafka 主题读取 Protobuf 消息并将它们写入文件系统镶木地板接收器:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])
override def deserialize(message: Array[Byte]): User =
User.parseFrom(message.slice(6, message.length))
override def isEndOfStream(nextElement: User): Boolean = false
}
object StreamingKafkaProtoToParquetLocalFs {
private val brokers = "localhost:9092"
private val topic = "test-topic-proto"
private val consumerGroupId = "test-consumer-proto"
private val targetPath = "file:///my/path"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.build
val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
val sink: StreamingFileSink[User] = StreamingFileSink
.forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
.build()
input.addSink(sink)
env.execute()
}
}
当我执行程序时,我看到写入目标路径的所有输出文件都是空的(0 大小)和 inprogress
尽管我启用了检查点。
重要的是要提到主题不是空的,当我将接收器更改为 print()
时,消息打印正确。
我错过了什么?为什么 print
和镶木地板水槽的行为不同?
您似乎使用的是最新版本的 Flink,因此请尝试进行以下更改:
val sink: FileSink[User] = FileSink
.forBulkFormat(new Path(s"$targetPath/data"),
ParquetProtoWriters.forType(classOf[User]))
.build()
input.sinkTo(sink)
StreamingFileSink
已过时,将被 FileSink
取代。
似乎显式添加 Apache Parquet Protobuf 的依赖项可以解决问题。
对于 Maven 用户,已将以下依赖项添加到 pom.xml
:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.11.1</version>
</dependency>
我有以下流式应用程序,它从 Kafka 主题读取 Protobuf 消息并将它们写入文件系统镶木地板接收器:
class ProtoDeserializer extends DeserializationSchema[User] {
override def getProducedType: TypeInformation[User] = TypeInformation.of(classOf[User])
override def deserialize(message: Array[Byte]): User =
User.parseFrom(message.slice(6, message.length))
override def isEndOfStream(nextElement: User): Boolean = false
}
object StreamingKafkaProtoToParquetLocalFs {
private val brokers = "localhost:9092"
private val topic = "test-topic-proto"
private val consumerGroupId = "test-consumer-proto"
private val targetPath = "file:///my/path"
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1))
env.getCheckpointConfig.setCheckpointStorage(s"$targetPath/checkpoints")
env.getConfig.registerTypeWithKryoSerializer(classOf[User], classOf[ProtobufSerializer])
val source = KafkaSource.builder[User]
.setBootstrapServers(brokers)
.setTopics(topic)
.setGroupId(consumerGroupId)
.setValueOnlyDeserializer(new ProtoDeserializer)
.setStartingOffsets(OffsetsInitializer.earliest())
.build
val input: DataStream[User] = env.fromSource(source, WatermarkStrategy.noWatermarks[User], "KafKaTable")
val sink: StreamingFileSink[User] = StreamingFileSink
.forBulkFormat(new Path(s"$targetPath/data"), ParquetProtoWriters.forType(classOf[User]))
.build()
input.addSink(sink)
env.execute()
}
}
当我执行程序时,我看到写入目标路径的所有输出文件都是空的(0 大小)和 inprogress
尽管我启用了检查点。
重要的是要提到主题不是空的,当我将接收器更改为 print()
时,消息打印正确。
我错过了什么?为什么 print
和镶木地板水槽的行为不同?
您似乎使用的是最新版本的 Flink,因此请尝试进行以下更改:
val sink: FileSink[User] = FileSink
.forBulkFormat(new Path(s"$targetPath/data"),
ParquetProtoWriters.forType(classOf[User]))
.build()
input.sinkTo(sink)
StreamingFileSink
已过时,将被 FileSink
取代。
似乎显式添加 Apache Parquet Protobuf 的依赖项可以解决问题。
对于 Maven 用户,已将以下依赖项添加到 pom.xml
:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>1.11.1</version>
</dependency>