apache flink avro FileSink 长时间处于进行中状态
apache flink avro FileSink is struck at in-progress state for long time
我有以下 avro 架构 User.avsc
{
"type": "record",
"namespace": "com.myorg",
"name": "User",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
下面的 java User.java
class 是使用 avro-maven-plugin.
从上面的 User.avsc 生成的
package com.myorg;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;
@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
private static final long serialVersionUID = 8699049231783654635L;
public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER;
private static final BinaryMessageDecoder<User> DECODER;
/** @deprecated */
@Deprecated
public long id;
/** @deprecated */
@Deprecated
public String name;
private static final DatumWriter<User> WRITER$;
private static final DatumReader<User> READER$;
public static Schema getClassSchema() {
return SCHEMA$;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
}
public ByteBuffer toByteBuffer() throws IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(ByteBuffer b) throws IOException {
return (User)DECODER.decode(b);
}
public User() {
}
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Schema getSchema() {
return SCHEMA$;
}
public Object get(int field$) {
switch(field$) {
case 0:
return this.id;
case 1:
return this.name;
default:
throw new AvroRuntimeException("Bad index");
}
}
public void put(int field$, Object value$) {
switch(field$) {
case 0:
this.id = (Long)value$;
break;
case 1:
this.name = (String)value$;
break;
default:
throw new AvroRuntimeException("Bad index");
}
}
public Long getId() {
return this.id;
}
public void setId(Long value) {
this.id = value;
}
public String getName() {
return this.name;
}
public void setName(String value) {
this.name = value;
}
public void writeExternal(ObjectOutput out) throws IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
public void readExternal(ObjectInput in) throws IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
static {
ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
READER$ = MODEL$.createDatumReader(SCHEMA$);
}
}
我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink。
下面是我写的程序-
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import java.util.Arrays;
public class AvroFileSinkApp {
private static final String OUTPUT_PATH = "./il/";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(4);
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("il")
.withPartSuffix(".avro")
.build();
DataStream<User> source = env.fromCollection(Arrays.asList(getUser(), getUser(), getUser(), getUser(), getUser(), getUser()));
source.sinkTo(FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).withBucketCheckInterval(5000).withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config).withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH")).build());
env.execute("FileSinkProgram");
Thread.sleep(300000);
}
public static User getUser() {
User u = new User();
u.setId(1L);
u.setName("raj");
return u;
}
}
我用 this and this as reference. The project is on github here 写了这个程序。
当我 运行 程序时,正在进行的文件正在创建,但没有检查点和提交临时文件。我已将 Thread.sleep(300000);
添加到 avro 文件中,但看不到进行中的文件。
我也等了一个小时的主线程,但没有运气。
知道什么阻止正在进行的文件移动到完成状态吗?
这个问题主要是因为Source是一个BOUNDED Source。 Checkpoint还没执行完,整个Flink Job的执行就结束了。
可以参考下面的例子生成User记录而不是fromCollection
/** Data-generating source function. */
public static final class Generator
implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
private ListState<Integer> state = null;
Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Tuple2.of(i, numRecordsEmitted));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<Integer>(
"state", IntSerializer.INSTANCE));
for (Integer i : state.get()) {
numRecordsEmitted += i;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.add(numRecordsEmitted);
}
}
}
我有以下 avro 架构 User.avsc
{
"type": "record",
"namespace": "com.myorg",
"name": "User",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
下面的 java User.java
class 是使用 avro-maven-plugin.
package com.myorg;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;
@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
private static final long serialVersionUID = 8699049231783654635L;
public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
private static SpecificData MODEL$ = new SpecificData();
private static final BinaryMessageEncoder<User> ENCODER;
private static final BinaryMessageDecoder<User> DECODER;
/** @deprecated */
@Deprecated
public long id;
/** @deprecated */
@Deprecated
public String name;
private static final DatumWriter<User> WRITER$;
private static final DatumReader<User> READER$;
public static Schema getClassSchema() {
return SCHEMA$;
}
public static BinaryMessageDecoder<User> getDecoder() {
return DECODER;
}
public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
}
public ByteBuffer toByteBuffer() throws IOException {
return ENCODER.encode(this);
}
public static User fromByteBuffer(ByteBuffer b) throws IOException {
return (User)DECODER.decode(b);
}
public User() {
}
public User(Long id, String name) {
this.id = id;
this.name = name;
}
public Schema getSchema() {
return SCHEMA$;
}
public Object get(int field$) {
switch(field$) {
case 0:
return this.id;
case 1:
return this.name;
default:
throw new AvroRuntimeException("Bad index");
}
}
public void put(int field$, Object value$) {
switch(field$) {
case 0:
this.id = (Long)value$;
break;
case 1:
this.name = (String)value$;
break;
default:
throw new AvroRuntimeException("Bad index");
}
}
public Long getId() {
return this.id;
}
public void setId(Long value) {
this.id = value;
}
public String getName() {
return this.name;
}
public void setName(String value) {
this.name = value;
}
public void writeExternal(ObjectOutput out) throws IOException {
WRITER$.write(this, SpecificData.getEncoder(out));
}
public void readExternal(ObjectInput in) throws IOException {
READER$.read(this, SpecificData.getDecoder(in));
}
static {
ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
READER$ = MODEL$.createDatumReader(SCHEMA$);
}
}
我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink。
下面是我写的程序-
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import java.util.Arrays;
public class AvroFileSinkApp {
private static final String OUTPUT_PATH = "./il/";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(4);
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("il")
.withPartSuffix(".avro")
.build();
DataStream<User> source = env.fromCollection(Arrays.asList(getUser(), getUser(), getUser(), getUser(), getUser(), getUser()));
source.sinkTo(FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).withBucketCheckInterval(5000).withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config).withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH")).build());
env.execute("FileSinkProgram");
Thread.sleep(300000);
}
public static User getUser() {
User u = new User();
u.setId(1L);
u.setName("raj");
return u;
}
}
我用 this and this as reference. The project is on github here 写了这个程序。
当我 运行 程序时,正在进行的文件正在创建,但没有检查点和提交临时文件。我已将 Thread.sleep(300000);
添加到 avro 文件中,但看不到进行中的文件。
知道什么阻止正在进行的文件移动到完成状态吗?
这个问题主要是因为Source是一个BOUNDED Source。 Checkpoint还没执行完,整个Flink Job的执行就结束了。
可以参考下面的例子生成User记录而不是fromCollection
/** Data-generating source function. */
public static final class Generator
implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {
private static final long serialVersionUID = -2819385275681175792L;
private final int numKeys;
private final int idlenessMs;
private final int recordsToEmit;
private volatile int numRecordsEmitted = 0;
private volatile boolean canceled = false;
private ListState<Integer> state = null;
Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
this.numKeys = numKeys;
this.idlenessMs = idlenessMs;
this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
}
@Override
public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (numRecordsEmitted < recordsToEmit) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Tuple2.of(i, numRecordsEmitted));
numRecordsEmitted++;
}
}
Thread.sleep(idlenessMs);
}
while (!canceled) {
Thread.sleep(50);
}
}
@Override
public void cancel() {
canceled = true;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<Integer>(
"state", IntSerializer.INSTANCE));
for (Integer i : state.get()) {
numRecordsEmitted += i;
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
state.clear();
state.add(numRecordsEmitted);
}
}
}