Flink ElasticsearchSinkFunction 在非静态方法中不可序列化,但在静态方法中可序列化

Flink ElasticsearchSinkFunction not serializable in non-static method, but serializable in static method

我有一段代码只能在静态方法中运行。如果我将代码放在静态方法中,然后从非静态方法中调用它,它就可以工作。从未听说过这样的事情,也无法在网上找到相关信息。

这个有效:

    public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
        startStatic(streamExecutionEnvironment);
    }

    private static void startStatic(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);


                        int endIndexExclusive = element.indexOf('\"', 8);
                        String id = element.substring(7, endIndexExclusive);
                        IndexRequest indexRequest = Requests.indexRequest()
                                .index("myindexzzz")
                                .id(id)
                                .source(element, XContentType.JSON);

                        return indexRequest;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());
    }

这行不通:

    public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);

        List<HttpHost> httpHosts = new ArrayList<>();
        httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));

// use a ElasticsearchSink.Builder to create an ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                httpHosts,
                new ElasticsearchSinkFunction<String>() {
                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);


                        int endIndexExclusive = element.indexOf('\"', 8);
                        String id = element.substring(7, endIndexExclusive);
                        IndexRequest indexRequest = Requests.indexRequest()
                                .index("myindexzzz")
                                .id(id)
                                .source(element, XContentType.JSON);

                        return indexRequest;
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
        esSinkBuilder.setBulkFlushMaxActions(1);

        // finally, build and add the sink to the job's pipeline
        input.addSink(esSinkBuilder.build());
    }

(完整)堆栈跟踪:

The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.

一个可能的原因是当在非静态上下文中创建匿名 class(在 new ElasticsearchSinkFunction<String>() { ... })时,它保留了对封闭实例的引用(并且您可以访问它的字段) .所以问题可能是当它试图序列化所述匿名 class 实例时,它到达了封闭实例,但未能序列化它。它不会发生在静态上下文中,因为匿名 class 没有封闭实例。但是,我尝试创建一个单独的 class 来扩展 ElasticsearchSinkFunction<String> 并使用它,但它仍然无法序列化,给出相同的错误但也说封闭实例不可序列化,这意味着它仍然是尝试序列化封闭实例。

注意:感谢 Programming Discussions Discord 服务器上的“Techno Chess, Atomic variation#1325”提出了这个可能的原因。

封闭的 class 确实被序列化了。为了使其工作,在 class 上实现 Serialible 并添加 serialVersionUID 变量。示例如下:

public abstract class Pipeline implements Serializable {

    private static final long serialVersionUID = 1L;

    ...
}

这使得扩展 Pipeline 的 classes 可以序列化并正常工作。显然,您也可以在非抽象 class 中实现可序列化接口并添加变量,这样就可以了。 class必须是可序列化的是提供Flink函数的那些,例如ElasticsearchSinkFunction。