Netty NioEventLoopGroup 导致 Vertx 中的 OutOfmemory 3.x

Netty NioEventLoopGroup causes OutOfmemory in Vertx 3.x

我在我的应用程序中使用 Vertx 3.6.3 和 Kafka,我将我的 Verticle 部署到一个集群中,但我的应用程序经常崩溃,我分析堆转储并得到如附图所示的错误。

创建了很多Netty NioEventLoopGroup 对象。这是vertx的错误还是我的代码的错误?谁能解释一下 Vertx 如何使用 Netty 以及为什么会出现此错误?

已更新:

我在下面分享我项目中源代码的一些细节

public class Application {

    private JsonObject config;

    public Application() {
    }

    // Getter, setter

}


public class BaseVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LogManager.getLogger(BaseVerticle.class);

    /**
     * Load config from properties.
     *
     * @return
     */
    protected Future<Application> loadConfig(Application application) {
        Future future = Future.future();

        ConfigStoreOptions file = new ConfigStoreOptions()
        .setType("file")
        .setFormat("properties")
        .setConfig(new JsonObject().put("path", "application.properties"));

        ConfigStoreOptions env = new ConfigStoreOptions().setType("env");
        ConfigStoreOptions sys = new ConfigStoreOptions().setType("sys");

        ConfigRetrieverOptions options = new ConfigRetrieverOptions()
            .addStore(file).addStore(env).addStore(sys);

        ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
        retriever.getConfig(json -> {
            if (json.failed()) {
                LOGGER.error("Failed to load configuration. Reason: " + json.cause().getMessage());
                // Failed to retrieve the configuration
                json.cause().printStackTrace();
                future.fail(json.cause());
            } else {
                LOGGER.info("Load configuration success.");
                JsonObject config = json.result();
                future.complete(application.setConfig(config));
            }
        });

        return future;
    }
}

public class MainVerticle extends BaseVerticle {

  private static final Logger LOGGER = LogManager.getLogger(MainVerticle.class);

  @Override
  public void start(Future<Void> startFuture) throws Exception {
    doStart(startFuture);
  }

  private void doStart(Future<Void> startFuture) {
        vertx.exceptionHandler(event -> LOGGER.error( " throws exception: {}", event.getMessage(), event));
        LOGGER.info("vertx.isClustered() = {}", vertx.isClustered());
        Application application = new Application();

        loadConfig(application)
        .compose(this::deployProcessingVerticle)
        .setHandler(r -> {
            if(r.succeeded()) {
                LOGGER.info("Deploy {} success.", getClass().getSimpleName());
                startFuture.complete();
            } else {
                LOGGER.info("Deploy {} failed.", getClass().getSimpleName());
                startFuture.fail(r.cause());
            }
        });

    }

    private Future<Application> deployProcessingVerticle(Application application) {
        Future<Application> future = Future.future();

        JsonObject configuration = application.getConfig();

        int WORKER_POOL_SIZE = configuration.getInteger("http.workerPoolSize");
        DeploymentOptions opts = new DeploymentOptions()
                .setHa(true)
                .setWorker(true)
                .setInstances(1)
                .setWorkerPoolSize(WORKER_POOL_SIZE)
                .setWorkerPoolName("processing")
                .setConfig(configuration);

        vertx.deployVerticle(ProcessingVerticle.class, opts, res -> {
            if (res.failed()) {
                future.fail(res.cause());
                LOGGER.error("Deploy ProcessingVerticle failed. Reason: {}", res.cause().getMessage(), res.cause());
            } else {
                future.complete(application);
                LOGGER.info("Deploy ProcessingVerticle success.");
            }
        });

        return future;
    }

    public static void main(String[] args) {
        Vertx.clusteredVertx(new VertxOptions().setHAEnabled(true), 
                vertx -> vertx.result().deployVerticle(MainVerticle.class.getName(), new DeploymentOptions().setHa(true))
    );
  }
}

public class ProcessingVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LogManager.getLogger(ProcessingVerticle.class);

    private KafkaHandler kafkaHandler;

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        super.start(startFuture);
        kafkaHandler = new KafkaHandler(vertx, config(), startFuture);
    }

}

public class KafkaHandler{

    private static final Logger logger = LogManager.getLogger(KafkaHandler.class);

    private KafkaWriteStream<String, JsonObject> producer;
    private KafkaReadStream<String, JsonObject> consumer;
    private Vertx vertx;
    private JsonObject config;
    private Function<JsonObject, Void> processMessage1;
    private Function<JsonObject, Void> processMessage2;

    private String topic1;
    private String topic2;

    public KafkaHandler(Vertx vertx, JsonObject config, Future<Void> startFuture){
        this.vertx = vertx;
        this.config = config;
        initTopics(config);
        startKafka(startFuture);
    }

    private void startKafka(Future<Void> startFuture) {
        createProducer();
        createConsumer();
    }

    private void createProducer() {
        Properties config = new Properties();
        String server = this.config.getString("kafka.servers", "localhost:9092");
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.config.getString("kafka.request.timeout", "30000"));
        config.put(ProducerConfig.ACKS_CONFIG, "1");
        producer = KafkaWriteStream.create(vertx, config, String.class, JsonObject.class);
    }

    private void initTopics(JsonObject config) {
        topic1 = this.config.getString(...);
        topic2 = this.config.getString(...);
    }

    public void publishMessage(JsonObject message, String topic){
        producer.write(new ProducerRecord<>(topic, message), ar -> {
            if (ar.failed()){
                logger.error(ar.cause());
            }
        });
    }

    private void createConsumer() {
        Properties config = new Properties();
        String server = this.config.getString("kafka.servers", "localhost:9092");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.config.getString("kafka.offset.reset", "latest"));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString("kafka.group.id"));
        consumer = KafkaReadStream.create(vertx, config, String.class, JsonObject.class);
    }

    private void processRecord(ConsumerRecord<String, JsonObject> record) {
        logger.info("Topic {} - Receive Message: {}", record.topic(), record.value().toString());

        if(record.topic().contains(topic1)){
            processMessage1.apply(record.value());
        }
        if(record.topic().contains(topic2)){
            processMessage2.apply(record.value());
        }
    }

    public void consumerSubscribe(List<Integer> coins){
        String[] arr = {topic1, topic2};
        String env = config.getString("env", "dev");
        List<String> listTopics = new ArrayList<>();
        for (String name : arr) {
            listTopics.add(name);
        }

        Set<String> topics = new HashSet<>(listTopics);
        consumer.subscribe(topics, ar -> {
            if (ar.succeeded()) {
                logger.info("Consumer subscribed");
                vertx.setPeriodic(1000, timerId -> {
                  consumer.poll(100, records -> {
                    if (records.succeeded()) {
                        records.result().forEach(record -> {
                            processRecord(record);
                        });
                    }
                  });
                });
            } else {
                logger.error(ar.cause());
            }
        });
    }

    @AfterClass
    public void stopKafka(){
        if (producer != null) {
            producer.close();
        }
        if (consumer != null) {
            consumer.close();
        }
    }


    // Getter, Setter
}

Vert.x 在后台使用 Netty 执行网络任务。

我非常怀疑这是 Vert.x 或 Netty 中的错误,因为两者都被广泛使用。 这可能是 Vert.x Kafka Client 的一个错误,但也怀疑它,因为这个客户端维护得很好,而且我最近没有看到任何此类问题的报告。

很可能这是您的代码中的错误。但是没有例子很难说。

上面的问题已经解决了,所以我们被滥用了Vertx.setTimer,这个方法产生线程并占用内存,所以我们总是得到异常"Out Of Memory"
参考:https://groups.google.com/forum/#!topic/vertx/K74PcXUauJM