Apache Flink Process xml 并将它们写入数据库

Apache Flink Process xml and write them to database

我有以下用例。

Xml 文件被写入我想通过 flink 使用和处理的 kafka 主题。 xml 属性必须重命名以匹配数据库 table 列。这些重命名必须是灵活的,并且可以从 flink 作业外部进行维护。 最后,必须将属性写入数据库。 每个 xml 文档代表一条数据库记录。

作为第二步,必须汇总最后 x 分钟内所有 xml 文档的所有某些属性。

据我所知,到目前为止,flink 能够执行所有提到的步骤,但我不知道如何正确地实施它。

目前我已经实现了 kafka 源代码,检索 xml 文档并通过自定义 MapFunction 解析它。我在那里创建了一个 POJO 并将每个属性名称和值存储在一个 HashMap 中。

public class Data{
    private Map<String,String> attributes = HashMap<>();
}

HashMap 包含:

Key: path.to.attribute.one Value: Value of attribute one

现在我想用Broadcasting State把原来的属性名改成数据库列名。 在这个阶段,我卡住了,因为我的 POJO 数据带有 HashMap 中的属性,但我不知道如何通过广播将它与映射连接起来。

另一种方法是将 xml 文档属性平面映射到单个记录中。这给我留下了两个问题:

对于第二阶段,我知道 Window 函数,即使我没有完全理解它的每个细节,但我想它会满足我的要求。这个阶段的问题是我是否可以在一项工作中使用多个接收器,而一个是原始数据流和一个聚合数据流。

有人可以帮忙提示吗?

干杯

更新 这是我到目前为止得到的 - 我简化了 XmlData POJO 代表我解析的 xml 文档的代码。

public class StreamingJob {
    static Logger LOG = LoggerFactory.getLogger(StreamingJob.class);

    public static void main(String[] args) throws Exception {

        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        XmlData xmlData1 = new XmlData();
        xmlData1.addAttribute("path.to.attribute.eventName","Start");
        xmlData1.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:00.000");
        xmlData1.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData1.addAttribute("path.to.attribute.additionalAttribute","Lorem");

        XmlData xmlData2 = new XmlData();
        xmlData2.addAttribute("path.to.attribute.eventName","Start");
        xmlData2.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData2.addAttribute("third.path.to.attribute.eventSource","Source2");
        xmlData2.addAttribute("path.to.attribute.additionalAttribute","First");

        XmlData xmlData3 = new XmlData();
        xmlData3.addAttribute("path.to.attribute.eventName","Start");
        xmlData3.addAttribute("second.path.to.attribute.eventTimestamp","2020-11-18T18:00:01.000");
        xmlData3.addAttribute("third.path.to.attribute.eventSource","Source1");
        xmlData3.addAttribute("path.to.attribute.additionalAttribute","Day");

        Mapping mapping1 = new Mapping();
        mapping1.addMapping("path.to.attribute.eventName","EVENT_NAME");
        mapping1.addMapping("second.path.to.attribute.eventTimestamp","EVENT_TIMESTAMP");

        DataStream<Mapping> mappingDataStream = env.fromElements(mapping1);

        MapStateDescriptor<String, Mapping> mappingStateDescriptor = new MapStateDescriptor<String, Mapping>(
                "MappingBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Mapping>() {}));

        BroadcastStream<Mapping> mappingBroadcastStream = mappingDataStream.broadcast(mappingStateDescriptor);

        DataStream<XmlData> dataDataStream = env.fromElements(xmlData1, xmlData2, xmlData3);

        //Convert the xml with all attributes to a stream of attribute names and values
        DataStream<Tuple2<String, String>> recordDataStream = dataDataStream
                .flatMap(new CustomFlatMapFunction());

        //Map the attributes with the mapping information
        DataStream<Tuple2<String,String>> outputDataStream = recordDataStream
                .connect(mappingBroadcastStream)
                .process();

        env.execute("Process xml data and write it to database");
    }
    
    static class XmlData{
        private Map<String,String> attributes = new HashMap<>();

    public XmlData(){
        }

        public String toString(){
            return this.attributes.toString();
        }

        public Map<String,String> getColumns(){
            return this.attributes;
        }

        public void addAttribute(String key, String value){
            this.attributes.put(key,value);
        }

        public String getAttributeValue(String attributeName){
            return attributes.get(attributeName);
        }
    }
    
    static class Mapping{
        //First string is the attribute path and name
        //Second string is the database column name
        Map<String,String> mappingTuple = new HashMap<>();

        public Mapping(){}

        public void addMapping(String attributeNameWithPath, String databaseColumnName){
            this.mappingTuple.put(attributeNameWithPath,databaseColumnName);
        }

        public Map<String, String> getMappingTuple() {
            return mappingTuple;
        }

        public void setMappingTuple(Map<String, String> mappingTuple) {
            this.mappingTuple = mappingTuple;
        }
    }

    static class CustomFlatMapFunction implements FlatMapFunction<XmlData, Tuple2<String,String>> {

        @Override
        public void flatMap(XmlData xmlData, Collector<Tuple2< String,String>> collector) throws Exception {
            for(Map.Entry<String,String> entrySet : xmlData.getColumns().entrySet()){
                collector.collect(new Tuple2<>(entrySet.getKey(), entrySet.getValue()));
            }
        }
    }

    static class CustomBroadcastingFunction extends BroadcastProcessFunction {
        @Override
        public void processElement(Object o, ReadOnlyContext readOnlyContext, Collector collector) throws Exception {
        }
        @Override
        public void processBroadcastElement(Object o, Context context, Collector collector) throws Exception {
        }
    }
}

下面是一些示例代码,说明如何使用 BroadcastStream 执行此操作。有一个微妙的问题,属性重映射数据可能会在其中一条记录 之后显示。通常,您会使用带状态的计时器来保存任何缺少重新映射数据的记录,但在您的情况下,不清楚丢失的重新映射是“需要等待更长的时间”还是“不存在映射”。无论如何,这应该可以帮助您入门...

    private static MapStateDescriptor<String, String> REMAPPING_STATE = new MapStateDescriptor<>("remappings", String.class, String.class);

    
    @Test
    public void testUnkeyedStreamWithBroadcastStream() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2);

        List<Tuple2<String, String>> attributeRemapping = new ArrayList<>();
        attributeRemapping.add(new Tuple2<>("one", "1"));
        attributeRemapping.add(new Tuple2<>("two", "2"));
        attributeRemapping.add(new Tuple2<>("three", "3"));
        attributeRemapping.add(new Tuple2<>("four", "4"));
        attributeRemapping.add(new Tuple2<>("five", "5"));
        attributeRemapping.add(new Tuple2<>("six", "6"));
        
        BroadcastStream<Tuple2<String, String>> attributes = env.fromCollection(attributeRemapping)
                .broadcast(REMAPPING_STATE);
        
        List<Map<String, Integer>> xmlData = new ArrayList<>();
        xmlData.add(makePOJO("one", 10));
        xmlData.add(makePOJO("two", 20));
        xmlData.add(makePOJO("three", 30));
        xmlData.add(makePOJO("four", 40));
        xmlData.add(makePOJO("five", 50));

        DataStream<Map<String, Integer>> records = env.fromCollection(xmlData);
        
        records.connect(attributes)
            .process(new MyRemappingFunction())
            .print();
        
        env.execute();
    }

    private Map<String, Integer> makePOJO(String key, int value) {
        Map<String, Integer> result = new HashMap<>();
        result.put(key, value);
        return result;
    }
    
    @SuppressWarnings("serial")
    private static class MyRemappingFunction extends BroadcastProcessFunction<Map<String, Integer>, Tuple2<String, String>, Map<String, Integer>> {

        @Override
        public void processBroadcastElement(Tuple2<String, String> in, Context ctx, Collector<Map<String, Integer>> out) throws Exception {
            ctx.getBroadcastState(REMAPPING_STATE).put(in.f0, in.f1);
        }

        @Override
        public void processElement(Map<String, Integer> in, ReadOnlyContext ctx, Collector<Map<String, Integer>> out) throws Exception {
            final ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(REMAPPING_STATE);

            Map<String, Integer> result = new HashMap<>();
            
            for (String key : in.keySet()) {
                if (state.contains(key)) {
                    result.put(state.get(key), in.get(key));
                } else {
                    result.put(key, in.get(key));
                }
            }
            
            out.collect(result);
        }
        
    }