如何将 Kafka (Java) 应用程序从 Windows 连接到 Linux 中的 Confluent
How to connect Kafka (Java) application from Windows to Confluent in Linux
我在 Linux 服务器上使用 Winscp 和 Putty 运行ning Confluent 5.0。我在 Windows.
中有 Kafka (Java/Eclipse) 应用程序
当我 运行 Java 应用程序时,它没有在 Linux 上 运行ning 中识别 Kafka 代理。
我已经测试了我的 Java 应用程序,该应用程序通过 MAC 终端中的 运行ning Confluent 5.0 将数据发送到 MACBook 中的 Kafka 主题。现在我正在尝试在 Windows 中实现相同的 Kafka 应用程序。由于 Windows 不支持 Confluent,我在 Linux 服务器中 运行ning。
我正在使用 Confluent 而不是 Apache Kafka,因为我在我的应用程序中使用了 Schema-registry。
通过使用 netstat -tupln & curl -v http:/localhost:端口号。弄清楚了 Kafka 在 8082 上的 运行ning 和在 8081 details of ports 上的模式注册表。
下面是我在 Java 应用程序中的 Kafka 属性。
public static Properties producerProperties() {
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer .class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
return properties;
}
public static Properties consumerProperties() {
// Properties properties = new Properties();
// normal consumer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
//different for consumer
properties.setProperty("group.id", "Avro-consumer");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.offset.reset", "earliest");
// avro part
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("specific.avro.reader", "true");
return properties;
}
public static Properties streamsProperties() {
// normal consumer
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
预计:
写入 Kafka 主题的数据。
实际:
WARN 无法建立与节点 -1 的连接。经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient:589)
您需要通过 Windows 机器确保 advertised.listeners
of the server.properties
file in Kafka is resolvable。还要确保防火墙允许访问 (netstat -tupln | grep LIST
),并查找在 0.0.0.0
上侦听的 Kafka 端口,例如。
我在 Linux 服务器上使用 Winscp 和 Putty 运行ning Confluent 5.0。我在 Windows.
中有 Kafka (Java/Eclipse) 应用程序当我 运行 Java 应用程序时,它没有在 Linux 上 运行ning 中识别 Kafka 代理。
我已经测试了我的 Java 应用程序,该应用程序通过 MAC 终端中的 运行ning Confluent 5.0 将数据发送到 MACBook 中的 Kafka 主题。现在我正在尝试在 Windows 中实现相同的 Kafka 应用程序。由于 Windows 不支持 Confluent,我在 Linux 服务器中 运行ning。
我正在使用 Confluent 而不是 Apache Kafka,因为我在我的应用程序中使用了 Schema-registry。
通过使用 netstat -tupln & curl -v http:/localhost:端口号。弄清楚了 Kafka 在 8082 上的 运行ning 和在 8081 details of ports 上的模式注册表。 下面是我在 Java 应用程序中的 Kafka 属性。
public static Properties producerProperties() {
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer .class.getName());
properties.setProperty("value.serializer", KafkaAvroSerializer .class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
return properties;
}
public static Properties consumerProperties() {
// Properties properties = new Properties();
// normal consumer
properties.setProperty("bootstrap.servers", "127.0.0.1:8082");
//different for consumer
properties.setProperty("group.id", "Avro-consumer");
properties.setProperty("enable.auto.commit", "false");
properties.setProperty("auto.offset.reset", "earliest");
// avro part
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("specific.avro.reader", "true");
return properties;
}
public static Properties streamsProperties() {
// normal consumer
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "com.github.ptn006");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:8082");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return properties;
}
预计: 写入 Kafka 主题的数据。
实际: WARN 无法建立与节点 -1 的连接。经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient:589)
您需要通过 Windows 机器确保 advertised.listeners
of the server.properties
file in Kafka is resolvable。还要确保防火墙允许访问 (netstat -tupln | grep LIST
),并查找在 0.0.0.0
上侦听的 Kafka 端口,例如。