Apache kafka producer不存储数据
Apache kafka producer does not store data
我正在尝试使用 public IP 访问部署在 AWS 服务器上的 kafka,但是当尝试连接它并发送一些数据时我没有收到任何响应并且服务器连接是 closed.Following 是我的生产者代码--
public SensorDevice() {
Properties props = new Properties();
props.put("metadata.broker.list", "myip-xyz:9092");
props.put("bootstrap.servers", "myip-xyz:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
producer = new KafkaProducer<String, String>(props);
}
public void run() {
Object objectData = new Object();
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topic, "mytopic", objectData.toString());
System.out.println(data);
Future<RecordMetadata> rs = producer.send(data,
new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata,
Exception arg1) {
System.out.println("Received ack for partition="
+ recordMetadata.partition() + " offset = "
+ recordMetadata.offset());
}
});
try {
String msg = "";
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset ="
+ rm.offset();
System.out.println(msg);
} catch (Exception e) {
System.out.println(e);
}
producer.close();
}
我也试过将 advertise.host.name 添加到 server.properties 配置文件。
Kafka 显示以下错误 --
> [2015-04-24 09:06:35,329] INFO Created log for partition [mytopic,0] in /tmp/kafka-logs with properties {segment.index.bytes ->
> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824,
> flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,
> index.interval.bytes -> 4096, retention.bytes -> -1,
> min.insync.replicas -> 1, cleanup.policy -> delete,
> unclean.leader.election.enable -> true, segment.ms -> 604800000,
> max.message.bytes -> 1000012, flush.messages -> 9223372036854775807,
> min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000,
> segment.jitter.ms -> 0}. (kafka.log.LogManager)
> [2015-04-24 09:06:35,330] WARN Partition [mytopic,0] on broker 0: No checkpointed highwatermark is found for partition [mytopic,0]
> (kafka.cluster.Partition)
> [2015-04-24 09:07:34,788] INFO Closing socket connection to /50.156.87.157. (kafka.network.Processor)
请帮我解决这个问题!
EC2 IP 地址是内部的。在处理 EC2 服务器 运行 kafka 和 zookeeper 时,您可能会遇到一些问题。尝试在 server.properties
文件中设置 advertised.host.name
和 advertised.port
变量。
advertised.host.name
应该是 EC2 服务器的 IP 地址。
advertised.port
应该是kafka端口。默认为 9092
。
我正在尝试使用 public IP 访问部署在 AWS 服务器上的 kafka,但是当尝试连接它并发送一些数据时我没有收到任何响应并且服务器连接是 closed.Following 是我的生产者代码--
public SensorDevice() {
Properties props = new Properties();
props.put("metadata.broker.list", "myip-xyz:9092");
props.put("bootstrap.servers", "myip-xyz:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
producer = new KafkaProducer<String, String>(props);
}
public void run() {
Object objectData = new Object();
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
topic, "mytopic", objectData.toString());
System.out.println(data);
Future<RecordMetadata> rs = producer.send(data,
new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata,
Exception arg1) {
System.out.println("Received ack for partition="
+ recordMetadata.partition() + " offset = "
+ recordMetadata.offset());
}
});
try {
String msg = "";
RecordMetadata rm = rs.get();
msg = msg + " partition = " + rm.partition() + " offset ="
+ rm.offset();
System.out.println(msg);
} catch (Exception e) {
System.out.println(e);
}
producer.close();
}
我也试过将 advertise.host.name 添加到 server.properties 配置文件。 Kafka 显示以下错误 --
> [2015-04-24 09:06:35,329] INFO Created log for partition [mytopic,0] in /tmp/kafka-logs with properties {segment.index.bytes ->
> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 1073741824,
> flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000,
> index.interval.bytes -> 4096, retention.bytes -> -1,
> min.insync.replicas -> 1, cleanup.policy -> delete,
> unclean.leader.election.enable -> true, segment.ms -> 604800000,
> max.message.bytes -> 1000012, flush.messages -> 9223372036854775807,
> min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000,
> segment.jitter.ms -> 0}. (kafka.log.LogManager)
> [2015-04-24 09:06:35,330] WARN Partition [mytopic,0] on broker 0: No checkpointed highwatermark is found for partition [mytopic,0]
> (kafka.cluster.Partition)
> [2015-04-24 09:07:34,788] INFO Closing socket connection to /50.156.87.157. (kafka.network.Processor)
请帮我解决这个问题!
EC2 IP 地址是内部的。在处理 EC2 服务器 运行 kafka 和 zookeeper 时,您可能会遇到一些问题。尝试在 server.properties
文件中设置 advertised.host.name
和 advertised.port
变量。
advertised.host.name
应该是 EC2 服务器的 IP 地址。
advertised.port
应该是kafka端口。默认为 9092
。