Flink 任务管理器超时

Flink Task Manager timeout

随着越来越多的记录被处理,我的程序变得非常慢。我最初认为这是由于内存消耗过多,因为我的程序是字符串密集型的(我正在使用 Java 11,因此应尽可能使用紧凑的字符串)所以我增加了 JVM 堆:

-Xms2048m
-Xmx6144m

我还增加了任务管理器的内存以及超时,flink-conf.yaml:

jobmanager.heap.size: 6144m
heartbeat.timeout: 5000000

但是,none 这有助于解决问题。该程序在处理大约 350 万条记录后的大约同一点仍然变得非常缓慢,只剩下大约 50 万条记录。当程序接近 350 万大关时,它变得非常非常慢,直到最终超时,总执行时间约为 11 分钟。

我检查了 VisualVm 中的内存消耗,但内存消耗从未超过 700MB.My flink 管道如下所示:

final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
environment.setParallelism(1);
DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
environment.execute("FlinkDataService");

大部分工作是在流程函数中完成的,我正在实现数据库连接算法并且列存储为字符串,特别是我正在实现 TPCH 基准测试的查询 3,如果您愿意,请在此处查看 https://examples.citusdata.com/tpch_queries.html.

超时错误是这样的:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <id> timed out.

有一次我也遇到了这个错误:

Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java heap space

另外,我的 VisualVM 监控,屏幕截图是在事情变得非常缓慢的时候捕获的:

这是我的源函数的 运行 循环:

  while (run) {
        readers.forEach(reader -> {
            try {
                String line = reader.readLine();
                if (line != null) {
                    Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
                    if (tuple != null && isValidTuple(tuple)) {
                        sourceContext.collect(tuple);
                    }
                } else {
                    closedReaders.add(reader);
                    if (closedReaders.size() == filePaths.size()) {
                        System.out.println("ALL FILES HAVE BEEN STREAMED");
                        cancel();
                    }
                }
                counter.getAndIncrement();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

我基本上读取了我需要的 3 个文件中的每一个文件的一行,根据文件的顺序,我构造了一个元组对象,这是我自定义的 class 称为元组,代表 table,并在该元组有效时发出该元组,即满足日期的某些条件。

我还建议 JVM 在第 1 百万、第 150 万、第 2 百万和第 250 万条记录时进行垃圾收集,如下所示:

System.gc()

关于如何优化它有什么想法吗?

String intern() 救了我。在将每个字符串存储在我的地图中之前,我对每个字符串进行了实习,这非常有效。

这些是我在 link 独立集群上更改的属性,用于计算 TPC-H 查询 03。

jobmanager.memory.process.size: 1600m
heartbeat.timeout: 100000
taskmanager.memory.process.size: 8g # defaul: 1728m

我实施此查询以仅流式传输订单 table,并将其他 table 保留为状态。此外,我正在计算作为无窗口查询,我认为它更有意义并且速度更快。

public class TPCHQuery03 {

    private final String topic = "topic-tpch-query-03";

    public TPCHQuery03() {
        this(PARAMETER_OUTPUT_LOG, "127.0.0.1", false, false, -1);
    }

    public TPCHQuery03(String output, String ipAddressSink, boolean disableOperatorChaining, boolean pinningPolicy, long maxCount) {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

            if (disableOperatorChaining) {
                env.disableOperatorChaining();
            }

            DataStream<Order> orders = env
                    .addSource(new OrdersSource(maxCount)).name(OrdersSource.class.getSimpleName()).uid(OrdersSource.class.getSimpleName());

            // Filter market segment "AUTOMOBILE"
            // customers = customers.filter(new CustomerFilter());

            // Filter all Orders with o_orderdate < 12.03.1995
            DataStream<Order> ordersFiltered = orders
                    .filter(new OrderDateFilter("1995-03-12")).name(OrderDateFilter.class.getSimpleName()).uid(OrderDateFilter.class.getSimpleName());

            // Join customers with orders and package them into a ShippingPriorityItem
            DataStream<ShippingPriorityItem> customerWithOrders = ordersFiltered
                    .keyBy(new OrderKeySelector())
                    .process(new OrderKeyedByCustomerProcessFunction(pinningPolicy)).name(OrderKeyedByCustomerProcessFunction.class.getSimpleName()).uid(OrderKeyedByCustomerProcessFunction.class.getSimpleName());

            // Join the last join result with Lineitems
            DataStream<ShippingPriorityItem> result = customerWithOrders
                    .keyBy(new ShippingPriorityOrderKeySelector())
                    .process(new ShippingPriorityKeyedProcessFunction(pinningPolicy)).name(ShippingPriorityKeyedProcessFunction.class.getSimpleName()).uid(ShippingPriorityKeyedProcessFunction.class.getSimpleName());

            // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
            DataStream<ShippingPriorityItem> resultSum = result
                    .keyBy(new ShippingPriority3KeySelector())
                    .reduce(new SumShippingPriorityItem(pinningPolicy)).name(SumShippingPriorityItem.class.getSimpleName()).uid(SumShippingPriorityItem.class.getSimpleName());

            // emit result
            if (output.equalsIgnoreCase(PARAMETER_OUTPUT_MQTT)) {
                resultSum
                        .map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
                        .addSink(new MqttStringPublisher(ipAddressSink, topic, pinningPolicy)).name(OPERATOR_SINK).uid(OPERATOR_SINK);
            } else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_LOG)) {
                resultSum.print().name(OPERATOR_SINK).uid(OPERATOR_SINK);
            } else if (output.equalsIgnoreCase(PARAMETER_OUTPUT_FILE)) {
                StreamingFileSink<String> sink = StreamingFileSink
                        .forRowFormat(new Path(PATH_OUTPUT_FILE), new SimpleStringEncoder<String>("UTF-8"))
                        .withRollingPolicy(
                                DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                        .withMaxPartSize(1024 * 1024 * 1024).build())
                        .build();

                resultSum
                        .map(new ShippingPriorityItemMap(pinningPolicy)).name(ShippingPriorityItemMap.class.getSimpleName()).uid(ShippingPriorityItemMap.class.getSimpleName())
                        .addSink(sink).name(OPERATOR_SINK).uid(OPERATOR_SINK);
            } else {
                System.out.println("discarding output");
            }

            System.out.println("Stream job: " + TPCHQuery03.class.getSimpleName());
            System.out.println("Execution plan >>>\n" + env.getExecutionPlan());
            env.execute(TPCHQuery03.class.getSimpleName());
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        new TPCHQuery03();
    }
}

UDF 在这里:OrderSource, OrderKeyedByCustomerProcessFunction, ShippingPriorityKeyedProcessFunction, and SumShippingPriorityItem。我正在使用 com.google.common.collect.ImmutableList 因为状态不会更新。此外,我只保留状态中必要的列,例如 ImmutableList<Tuple2<Long, Double>> lineItemList.