从 Flink 1.3.2 升级到 1.4.0 hadoop 文件系统和路径问题
Upgrading from Flink 1.3.2 to 1.4.0 hadoop FileSystem and Path issues
我最近尝试从 Flink 1.3.2 升级到 1.4.0,但我遇到了一些无法再导入 org.apache.hadoop.fs.{FileSystem, Path}
的问题。该问题出现在两个地方:
ParquetWriter:
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
class AvroWriter[T <: GenericRecord]() extends Writer[T] {
@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _
override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}
override def duplicate(): AvroWriter[T] = new AvroWriter[T]()
override def close(): Unit = writer.close()
override def getPos: Long = writer.getDataSize
override def flush(): Long = writer.getDataSize
override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
}
自定义桶:
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.ObjectInputStream
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {
@transient var dateFormatter: SimpleDateFormat = _
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
if (dateField != null && dateFieldFormat != null) {
dateFormatter = new SimpleDateFormat(dateFieldFormat)
}
}
override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {
val partitions = bucketOrder.map(field => {
if (field == dateField) {
field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))
} else {
field + "=" + element.get(field)
}
}).mkString("/")
new Path(basePath + "/" + partitions)
}
}
我注意到 Flink 现在有:
import org.apache.flink.core.fs.{FileSystem, Path}
但新的 Path
似乎不适用于 AvroParquetWriter
或 getBucketPath
方法。我知道 Flink 的文件系统和 Hadoop 依赖项发生了一些变化,我只是不确定我需要导入什么才能使我的代码再次运行。
我什至需要使用 Hadoop 依赖项,或者现在是否有不同的方式将 Parquet 文件写入和存储到 s3?
build.sbt:
val flinkVersion = "1.4.0"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.2",
"com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"
)
构建 "Hadoop-Free-Flink" 是 1.4 版本的一项主要功能。
您所要做的就是将 hadoop 依赖项包含到您的类路径中或引用 changelogs:
... This also means that in cases where you used connectors to HDFS, such as the BucketingSink or RollingSink, you now have to ensure that you either use a Flink distribution with bundled Hadoop dependencies or make sure to include Hadoop dependencies when building a jar file for your application.
在 hadoop-commons 项目中找到了必要的 org.apache.hadoop.fs.{FileSystem, Path}
类。
我最近尝试从 Flink 1.3.2 升级到 1.4.0,但我遇到了一些无法再导入 org.apache.hadoop.fs.{FileSystem, Path}
的问题。该问题出现在两个地方:
ParquetWriter:
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
class AvroWriter[T <: GenericRecord]() extends Writer[T] {
@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _
override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}
override def duplicate(): AvroWriter[T] = new AvroWriter[T]()
override def close(): Unit = writer.close()
override def getPos: Long = writer.getDataSize
override def flush(): Long = writer.getDataSize
override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
}
自定义桶:
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.ObjectInputStream
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {
@transient var dateFormatter: SimpleDateFormat = _
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
if (dateField != null && dateFieldFormat != null) {
dateFormatter = new SimpleDateFormat(dateFieldFormat)
}
}
override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {
val partitions = bucketOrder.map(field => {
if (field == dateField) {
field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))
} else {
field + "=" + element.get(field)
}
}).mkString("/")
new Path(basePath + "/" + partitions)
}
}
我注意到 Flink 现在有:
import org.apache.flink.core.fs.{FileSystem, Path}
但新的 Path
似乎不适用于 AvroParquetWriter
或 getBucketPath
方法。我知道 Flink 的文件系统和 Hadoop 依赖项发生了一些变化,我只是不确定我需要导入什么才能使我的代码再次运行。
我什至需要使用 Hadoop 依赖项,或者现在是否有不同的方式将 Parquet 文件写入和存储到 s3?
build.sbt:
val flinkVersion = "1.4.0"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.2",
"com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"
)
构建 "Hadoop-Free-Flink" 是 1.4 版本的一项主要功能。 您所要做的就是将 hadoop 依赖项包含到您的类路径中或引用 changelogs:
... This also means that in cases where you used connectors to HDFS, such as the BucketingSink or RollingSink, you now have to ensure that you either use a Flink distribution with bundled Hadoop dependencies or make sure to include Hadoop dependencies when building a jar file for your application.
在 hadoop-commons 项目中找到了必要的 org.apache.hadoop.fs.{FileSystem, Path}
类。