为什么使用 Java 客户端传输的数据在 Apache Pulsar 上显示为已编码?
Why does the data streamed using the Java client appear as encoded on Apache Pulsar?
我正在尝试将一些数据从 Java 应用流式传输到 Apache-Pulsar 集群中。我面临的问题是数据似乎已编码。 。例如。 "\u0000\u0000\u0000\u0004\u0018l@\u0000\u0000\u0000�@�\u000fV�\u0001\u0000\u00006B\u0000\u0000�@\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000�\t���AUU�A\u0002(2021-10-04T14:00:00Z\u0002H88c8dc24-233c-45f5-b366-85382d7d52c6\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000"
只是我作为字符串发送的参数显然是正确的。
我的生产者代码是这样构建的:
PulsarClient client = PulsarClient.builder()
.serviceUrl(service_url)
.tlsTrustCertsFilePath("/etc/ssl/certs/ca-certificates.crt")
.authentication(
AuthenticationFactory.token(token)
)
.build();
Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class))
.topic(topic)
.create();
Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime());
final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
.format(timestamp);
float lon = Float.parseFloat((prop.getProperty("sensor.longitude")));
float lat = Float.parseFloat((prop.getProperty("sensor.latitude")));
float alt = Float.parseFloat((prop.getProperty("sensor.altitude")));
log.fine("Sending message to Pulsar.");
producer.newMessage().value(DavisMessage.builder()
.uuid(prop.getProperty("sensor.uuid"))
.latitude(lat)
.longitude(lon)
.altitude(alt)
.ts(formattedtimestamp)
.temp_out((float) rec.getOutsideTemperature())
.temp_in((float) rec.getInsideTemperature())
.hum_out((short) rec.getOutsideHumidity())
.hum_in((short) rec.getInsideHumidity())
.barometer(rec.getBarometer())
.rain((float) rec.getRainFall())
.rain_rate((float) rec.getRainRateHigh())
.wind_avg((float) rec.getWindSpeedAvg())
.wind_dir((short) rec.getWindDirection())
.wind_high((float) rec.getWindSpeedHigh())
.solar((short) rec.getSolarRadiation())
.uv((float) rec.getUvIndex())
.build()).send();
它按要求用 Avro 序列化编码:
Schema.AVRO(DavisMessage.class)
您可以在消费者上指定相同的模式,让它自动为您反序列化。
如果您想使用人类可读的负载,您可以使用 Schema.JSON(DavisMessage.class)
。
我正在尝试将一些数据从 Java 应用流式传输到 Apache-Pulsar 集群中。我面临的问题是数据似乎已编码。 。例如。 "\u0000\u0000\u0000\u0004\u0018l@\u0000\u0000\u0000�@�\u000fV�\u0001\u0000\u00006B\u0000\u0000�@\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000�\t���AUU�A\u0002(2021-10-04T14:00:00Z\u0002H88c8dc24-233c-45f5-b366-85382d7d52c6\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001\u0000\u0000\u0000\u0000"
只是我作为字符串发送的参数显然是正确的。
我的生产者代码是这样构建的:
PulsarClient client = PulsarClient.builder()
.serviceUrl(service_url)
.tlsTrustCertsFilePath("/etc/ssl/certs/ca-certificates.crt")
.authentication(
AuthenticationFactory.token(token)
)
.build();
Producer<DavisMessage> producer = client.newProducer(Schema.AVRO(DavisMessage.class))
.topic(topic)
.create();
Timestamp timestamp = new Timestamp(rec.getTimestamp().getTime());
final String formattedtimestamp = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
.format(timestamp);
float lon = Float.parseFloat((prop.getProperty("sensor.longitude")));
float lat = Float.parseFloat((prop.getProperty("sensor.latitude")));
float alt = Float.parseFloat((prop.getProperty("sensor.altitude")));
log.fine("Sending message to Pulsar.");
producer.newMessage().value(DavisMessage.builder()
.uuid(prop.getProperty("sensor.uuid"))
.latitude(lat)
.longitude(lon)
.altitude(alt)
.ts(formattedtimestamp)
.temp_out((float) rec.getOutsideTemperature())
.temp_in((float) rec.getInsideTemperature())
.hum_out((short) rec.getOutsideHumidity())
.hum_in((short) rec.getInsideHumidity())
.barometer(rec.getBarometer())
.rain((float) rec.getRainFall())
.rain_rate((float) rec.getRainRateHigh())
.wind_avg((float) rec.getWindSpeedAvg())
.wind_dir((short) rec.getWindDirection())
.wind_high((float) rec.getWindSpeedHigh())
.solar((short) rec.getSolarRadiation())
.uv((float) rec.getUvIndex())
.build()).send();
它按要求用 Avro 序列化编码:
Schema.AVRO(DavisMessage.class)
您可以在消费者上指定相同的模式,让它自动为您反序列化。
如果您想使用人类可读的负载,您可以使用 Schema.JSON(DavisMessage.class)
。