Flink Streaming RichSource 提前停止

Flink Streaming RichSource early stops

它 运行s 处理时间和使用广播状态。


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


    BroadcastStream<List<TableOperations>> broadcastOperationsState = env
                .addSource(new LoadCassandraOperations(10000L, cassandraHost, cassandraPort)).broadcast(descriptor);

        SingleOutputStreamOperator<InternalVariableValue> stream = 
                env.addSource(new SourceMillisInternalVariableValue(5000L));

        
        SingleOutputStreamOperator<InternalVariableOperation> streamProcessed = 
                stream
                .keyBy(InternalVariableValue::getUuid)
                .connect(broadcastOperationsState)
                .process(new AddOperationInfo())
                ;

        

streamProcessed.print();


SourceMillisIntervalVariableValues 每 5 秒创建一个事件。事件存储在静态集合中。 运行 方法看起来像:


    public class SourceMillisInternalVariableValue extends RichSourceFunction<InternalVariableValue>{
    
    private boolean running;
    
    long millis;

    public SourceMillisInternalVariableValue(long millis) {
        super();
        this.millis = millis;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        running = true;
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void run(SourceContext<InternalVariableValue> ctx) throws Exception {        
        //Espera inicial
        Thread.sleep(1500);
        PojoVariableValues[] pojoData =
                 new PojoVariableValues[]{
                  new PojoVariableValues("id1", "1"),
                  new PojoVariableValues("id2", "2"),
                 ....
                 ....
                  new PojoVariableValues("id21", "21")
                         };
        
         int cont = 0;
         while (cont<pojoData.length) {
            System.out.println("Iteration "+cont+" "+pojoData.length);
            ctx.collect(generateVar(pojoData[0+cont].getUUID(), pojoData[0+cont].getValue()));      
            ctx.collect(generateVar(pojoData[1+cont].getUUID(), pojoData[1+cont].getValue()));      
            ctx.collect(generateVar(pojoData[2+cont].getUUID(), pojoData[2+cont].getValue()));      
            cont = cont +3;
            Thread.sleep(millis);       
        }
        
    }
    
    private InternalVariableValue generateVar(String uuid, String value)
    {
        return InternalVariableValueMessage.InternalVariableValue.newBuilder()
                .setUuid(uuid)
                .setTimestamp(new Date().getTime()).setValue(value).setKeyspace("nest").build();
    }
    
    class PojoVariableValues {
        private String UUID;
        private String Value;
        
        public PojoVariableValues(String uUID, String value) {
            super();
            UUID = uUID;
            Value = value;
        }
        
        public String getUUID() {
            return UUID;
        }
        public void setUUID(String uUID) {
            UUID = uUID;
        }
        public String getValue() {
            return Value;
        }
        public void setValue(String value) {
            Value = value;
        }
        
       }
}

LoadCassandraOperations 每 10 秒发出一次事件。它工作正常。

当我 运行 此代码时,SourceMillisIntervalVariableValues 在第一次迭代中停止,仅发出三个事件。如果我评论 process 函数,两个来源都正常工作,但如果我 运行 process ,来源被取消...

我认为源发出了所有事件(恰好 21 个),并且所有事件都在聚合函数中处理。如果我 运行 这段代码,源代码中的 while 循环只完成一次迭代。

有什么想法吗?

谢谢你。干杯

编辑:

重要。此代码用于探索处理时间和广播功能。我知道我没有使用资源中的最佳实践。谢谢

编辑 2: 当我尝试 运行 处理函数时,问题就开始了。

已解决!!

问题是我尝试使用 TestContainer 运行 它,但我看不到任何日志。

我运行它有一个简单的主要方法,我可以看到一些代码错误(就像评论中的评论一样。Tnks !!!)。