apache flink avro FileSink 长时间处于进行中状态

apache flink avro FileSink is struck at in-progress state for long time

我有以下 avro 架构 User.avsc

{
  "type": "record",
  "namespace": "com.myorg",
  "name": "User",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

下面的 java User.java class 是使用 avro-maven-plugin.

从上面的 User.avsc 生成的
package com.myorg;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.data.RecordBuilder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.SchemaStore;
import org.apache.avro.specific.AvroGenerated;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.specific.SpecificRecordBuilderBase;

@AvroGenerated
public class User extends SpecificRecordBase implements SpecificRecord {
    private static final long serialVersionUID = 8699049231783654635L;
    public static final Schema SCHEMA$ = (new Parser()).parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.myorg\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
    private static SpecificData MODEL$ = new SpecificData();
    private static final BinaryMessageEncoder<User> ENCODER;
    private static final BinaryMessageDecoder<User> DECODER;
    /** @deprecated */
    @Deprecated
    public long id;
    /** @deprecated */
    @Deprecated
    public String name;
    private static final DatumWriter<User> WRITER$;
    private static final DatumReader<User> READER$;

    public static Schema getClassSchema() {
        return SCHEMA$;
    }

    public static BinaryMessageDecoder<User> getDecoder() {
        return DECODER;
    }

    public static BinaryMessageDecoder<User> createDecoder(SchemaStore resolver) {
        return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver);
    }

    public ByteBuffer toByteBuffer() throws IOException {
        return ENCODER.encode(this);
    }

    public static User fromByteBuffer(ByteBuffer b) throws IOException {
        return (User)DECODER.decode(b);
    }

    public User() {
    }

    public User(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Schema getSchema() {
        return SCHEMA$;
    }

    public Object get(int field$) {
        switch(field$) {
        case 0:
            return this.id;
        case 1:
            return this.name;
        default:
            throw new AvroRuntimeException("Bad index");
        }
    }

    public void put(int field$, Object value$) {
        switch(field$) {
        case 0:
            this.id = (Long)value$;
            break;
        case 1:
            this.name = (String)value$;
            break;
        default:
            throw new AvroRuntimeException("Bad index");
        }

    }

    public Long getId() {
        return this.id;
    }

    public void setId(Long value) {
        this.id = value;
    }

    public String getName() {
        return this.name;
    }

    public void setName(String value) {
        this.name = value;
    }

    public void writeExternal(ObjectOutput out) throws IOException {
        WRITER$.write(this, SpecificData.getEncoder(out));
    }

    public void readExternal(ObjectInput in) throws IOException {
        READER$.read(this, SpecificData.getDecoder(in));
    }

    static {
        ENCODER = new BinaryMessageEncoder(MODEL$, SCHEMA$);
        DECODER = new BinaryMessageDecoder(MODEL$, SCHEMA$);
        WRITER$ = MODEL$.createDatumWriter(SCHEMA$);
        READER$ = MODEL$.createDatumReader(SCHEMA$);
    }

}

我想使用 apache flink 将 User SpecificRecord 实例写入文件 FileSink

下面是我写的程序-

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.myorg.User;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import java.util.Arrays;

public class AvroFileSinkApp {

    private static final String OUTPUT_PATH = "./il/";
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setParallelism(4);
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("il")
                .withPartSuffix(".avro")
                .build();

        DataStream<User> source = env.fromCollection(Arrays.asList(getUser(), getUser(), getUser(), getUser(), getUser(), getUser()));
        source.sinkTo(FileSink.forBulkFormat(new Path(OUTPUT_PATH), AvroWriters.forSpecificRecord(User.class)).withBucketCheckInterval(5000).withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config).withBucketAssigner(new DateTimeBucketAssigner<>("yyyy/MM/dd/HH")).build());
        env.execute("FileSinkProgram");
        Thread.sleep(300000);
    }

    public static User getUser() {
        User u = new User();
        u.setId(1L);
        u.setName("raj");
        return u;
    }
}

我用 this and this as reference. The project is on github here 写了这个程序。

当我 运行 程序时,正在进行的文件正在创建,但没有检查点和提交临时文件。我已将 Thread.sleep(300000); 添加到 avro 文件中,但看不到进行中的文件。

我也等了一个小时的主线程,但没有运气。

知道什么阻止正在进行的文件移动到完成状态吗?

这个问题主要是因为Source是一个BOUNDED Source。 Checkpoint还没执行完,整个Flink Job的执行就结束了。

可以参考下面的例子生成User记录而不是fromCollection

    /** Data-generating source function. */
    public static final class Generator
            implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction {

        private static final long serialVersionUID = -2819385275681175792L;

        private final int numKeys;
        private final int idlenessMs;
        private final int recordsToEmit;

        private volatile int numRecordsEmitted = 0;
        private volatile boolean canceled = false;

        private ListState<Integer> state = null;

        Generator(final int numKeys, final int idlenessMs, final int durationSeconds) {
            this.numKeys = numKeys;
            this.idlenessMs = idlenessMs;

            this.recordsToEmit = ((durationSeconds * 1000) / idlenessMs) * numKeys;
        }

        @Override
        public void run(final SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            while (numRecordsEmitted < recordsToEmit) {
                synchronized (ctx.getCheckpointLock()) {
                    for (int i = 0; i < numKeys; i++) {
                        ctx.collect(Tuple2.of(i, numRecordsEmitted));
                        numRecordsEmitted++;
                    }
                }
                Thread.sleep(idlenessMs);
            }

            while (!canceled) {
                Thread.sleep(50);
            }
        }

        @Override
        public void cancel() {
            canceled = true;
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            state =
                    context.getOperatorStateStore()
                            .getListState(
                                    new ListStateDescriptor<Integer>(
                                            "state", IntSerializer.INSTANCE));

            for (Integer i : state.get()) {
                numRecordsEmitted += i;
            }
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            state.clear();
            state.add(numRecordsEmitted);
        }
    }
}