Flink ElasticsearchSinkFunction 在非静态方法中不可序列化,但在静态方法中可序列化
Flink ElasticsearchSinkFunction not serializable in non-static method, but serializable in static method
我有一段代码只能在静态方法中运行。如果我将代码放在静态方法中,然后从非静态方法中调用它,它就可以工作。从未听说过这样的事情,也无法在网上找到相关信息。
这个有效:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
startStatic(streamExecutionEnvironment);
}
private static void startStatic(StreamExecutionEnvironment streamExecutionEnvironment) {
DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
int endIndexExclusive = element.indexOf('\"', 8);
String id = element.substring(7, endIndexExclusive);
IndexRequest indexRequest = Requests.indexRequest()
.index("myindexzzz")
.id(id)
.source(element, XContentType.JSON);
return indexRequest;
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
}
这行不通:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
int endIndexExclusive = element.indexOf('\"', 8);
String id = element.substring(7, endIndexExclusive);
IndexRequest indexRequest = Requests.indexRequest()
.index("myindexzzz")
.id(id)
.source(element, XContentType.JSON);
return indexRequest;
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
}
(完整)堆栈跟踪:
The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
一个可能的原因是当在非静态上下文中创建匿名 class(在 new ElasticsearchSinkFunction<String>() { ... }
)时,它保留了对封闭实例的引用(并且您可以访问它的字段) .所以问题可能是当它试图序列化所述匿名 class 实例时,它到达了封闭实例,但未能序列化它。它不会发生在静态上下文中,因为匿名 class 没有封闭实例。但是,我尝试创建一个单独的 class 来扩展 ElasticsearchSinkFunction<String>
并使用它,但它仍然无法序列化,给出相同的错误但也说封闭实例不可序列化,这意味着它仍然是尝试序列化封闭实例。
注意:感谢 Programming Discussions Discord 服务器上的“Techno Chess, Atomic variation#1325”提出了这个可能的原因。
封闭的 class 确实被序列化了。为了使其工作,在 class 上实现 Serialible 并添加 serialVersionUID 变量。示例如下:
public abstract class Pipeline implements Serializable {
private static final long serialVersionUID = 1L;
...
}
这使得扩展 Pipeline 的 classes 可以序列化并正常工作。显然,您也可以在非抽象 class 中实现可序列化接口并添加变量,这样就可以了。 class必须是可序列化的是提供Flink函数的那些,例如ElasticsearchSinkFunction。
我有一段代码只能在静态方法中运行。如果我将代码放在静态方法中,然后从非静态方法中调用它,它就可以工作。从未听说过这样的事情,也无法在网上找到相关信息。
这个有效:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
startStatic(streamExecutionEnvironment);
}
private static void startStatic(StreamExecutionEnvironment streamExecutionEnvironment) {
DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
int endIndexExclusive = element.indexOf('\"', 8);
String id = element.substring(7, endIndexExclusive);
IndexRequest indexRequest = Requests.indexRequest()
.index("myindexzzz")
.id(id)
.source(element, XContentType.JSON);
return indexRequest;
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
}
这行不通:
public void start(StreamExecutionEnvironment streamExecutionEnvironment) {
DataStream<String> input = Consumer.createKafkaConsumer(streamExecutionEnvironment, BookIndex.SINK_TOPIC_NAME, new SimpleStringSchema(), "book_index_es_group_v1", true, false);
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("xxx.xxx.xxx.xxx", 9200, "http"));
// use a ElasticsearchSink.Builder to create an ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
int endIndexExclusive = element.indexOf('\"', 8);
String id = element.substring(7, endIndexExclusive);
IndexRequest indexRequest = Requests.indexRequest()
.index("myindexzzz")
.id(id)
.source(element, XContentType.JSON);
return indexRequest;
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build());
}
(完整)堆栈跟踪:
The implementation of the provided ElasticsearchSinkFunction is not serializable. The object probably contains or references non-serializable fields.
一个可能的原因是当在非静态上下文中创建匿名 class(在 new ElasticsearchSinkFunction<String>() { ... }
)时,它保留了对封闭实例的引用(并且您可以访问它的字段) .所以问题可能是当它试图序列化所述匿名 class 实例时,它到达了封闭实例,但未能序列化它。它不会发生在静态上下文中,因为匿名 class 没有封闭实例。但是,我尝试创建一个单独的 class 来扩展 ElasticsearchSinkFunction<String>
并使用它,但它仍然无法序列化,给出相同的错误但也说封闭实例不可序列化,这意味着它仍然是尝试序列化封闭实例。
注意:感谢 Programming Discussions Discord 服务器上的“Techno Chess, Atomic variation#1325”提出了这个可能的原因。
封闭的 class 确实被序列化了。为了使其工作,在 class 上实现 Serialible 并添加 serialVersionUID 变量。示例如下:
public abstract class Pipeline implements Serializable {
private static final long serialVersionUID = 1L;
...
}
这使得扩展 Pipeline 的 classes 可以序列化并正常工作。显然,您也可以在非抽象 class 中实现可序列化接口并添加变量,这样就可以了。 class必须是可序列化的是提供Flink函数的那些,例如ElasticsearchSinkFunction。