如何在other的基础上过滤Apache flink流?

How to filter Apache flink stream on the basis of other?

我有两个流,一个是 Int,另一个是 json。在 json 模式中有一个键是一些 int。所以我需要过滤 json stream via key comparison with the other integer stream 所以在 Flink 中有可能吗?

是的,你可以用Flink做这种流处理。 Flink 所需的基本构建块是连接流和有状态函数——这是一个使用 RichCoFlatMap 的示例:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}

在这个例子中,只有那些在控制流上看到的键通过数据流传递——所有其他事件都被过滤掉。我利用了 Flink's managed keyed state and connected streams.

为简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到有关如何使用 JSON 和 Flink 的示例。

请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间。您可以通过向流添加事件时间戳,然后改用 RichCoProcessFunction 来管理它。