即使 REST 程序具有 INT,Kafka 也会将消息密钥生成为 STRING?
Kafka producing message key as STRING even though the REST program has INT?
我正在使用以下程序在 kafka 中生成记录:
import java.io.IOException;
import java.security.SecureRandom;
public class SensorStatusProducer {
private final static String TOPIC = "SENSOR_STATUS_DETAILS";
private final static String PRODUCER_URI = "http://xxx.xxx.xxx.xxx:8082/topics/" + TOPIC;
private final static SecureRandom randomNumber = new SecureRandom();
private final static SensorDetails sensorDetails = new SensorDetails();
public static void main(String[] args) {
int[] sensorid = sensorDetails.getSensorid(); //this will return [1001,1002,1003,1004,1005]
try {
HttpRestProxyUtil rest = new HttpRestProxyUtil(); //this is defined in another class
for (int sid : sensorid) {
rest.produceRecords(PRODUCER_URI, String.format("{\"records\":[{\"key\": %d," +
"\"value\":{" +
"\"sensorid\":%d," +
"\"status\":%s," +
"\"lastconnectedtime\":%s}}]}", sid, sid, "\"CONNECTED\"", String.format("\"%s\"", sensorDetails.currentTimestamp()))); //currentTimestamp() function in defined in another class
}
} catch (InterruptedException | IOException me) {
me.printStackTrace();
}
}
}
键的格式说明符为 %d 但生成的记录具有 STRING 类型的键。
这可以通过以下内容得到证明:
尝试制作时 table:
CREATE TABLE STATUS_IB_TABLE (ROWKEY INT KEY,
sensorid INTEGER,
status VARCHAR,
lastconnectedtime STRING)
WITH (TIMESTAMP='lastconnectedtime', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', KAFKA_TOPIC='SENSOR_STATUS_DETAILS', VALUE_FORMAT='JSON', KEY='sensorid');
指出的那样,KEY被序列化为STRING
我不知道怎么可能。
谁能帮我澄清一下,我做错了什么?
PS:
=> 这是我之前问题的后续问题
=> 融合平台版本:5.5
=> 这是程序的主要 class。
REST Proxy 支持各种content types,但不包括写入序列化32 位整数的原始类型。
因此,您的代码使用字符串键为主题生成数据。有关如何生成 INT 的示例,请参见使用 kafkacat
的示例 here。
由于您使用的是 Java,因此您可以使用本机 Java Producer API 来准确控制向 Kafka 生成数据的方式(这也更加高效和灵活比 REST API)。
我正在使用以下程序在 kafka 中生成记录:
import java.io.IOException;
import java.security.SecureRandom;
public class SensorStatusProducer {
private final static String TOPIC = "SENSOR_STATUS_DETAILS";
private final static String PRODUCER_URI = "http://xxx.xxx.xxx.xxx:8082/topics/" + TOPIC;
private final static SecureRandom randomNumber = new SecureRandom();
private final static SensorDetails sensorDetails = new SensorDetails();
public static void main(String[] args) {
int[] sensorid = sensorDetails.getSensorid(); //this will return [1001,1002,1003,1004,1005]
try {
HttpRestProxyUtil rest = new HttpRestProxyUtil(); //this is defined in another class
for (int sid : sensorid) {
rest.produceRecords(PRODUCER_URI, String.format("{\"records\":[{\"key\": %d," +
"\"value\":{" +
"\"sensorid\":%d," +
"\"status\":%s," +
"\"lastconnectedtime\":%s}}]}", sid, sid, "\"CONNECTED\"", String.format("\"%s\"", sensorDetails.currentTimestamp()))); //currentTimestamp() function in defined in another class
}
} catch (InterruptedException | IOException me) {
me.printStackTrace();
}
}
}
键的格式说明符为 %d 但生成的记录具有 STRING 类型的键。
这可以通过以下内容得到证明:
尝试制作时 table:
CREATE TABLE STATUS_IB_TABLE (ROWKEY INT KEY,
sensorid INTEGER,
status VARCHAR,
lastconnectedtime STRING)
WITH (TIMESTAMP='lastconnectedtime', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', KAFKA_TOPIC='SENSOR_STATUS_DETAILS', VALUE_FORMAT='JSON', KEY='sensorid');
我不知道怎么可能。
谁能帮我澄清一下,我做错了什么?
PS:
=> 这是我之前问题的后续问题
=> 融合平台版本:5.5
=> 这是程序的主要 class。
REST Proxy 支持各种content types,但不包括写入序列化32 位整数的原始类型。
因此,您的代码使用字符串键为主题生成数据。有关如何生成 INT 的示例,请参见使用 kafkacat
的示例 here。
由于您使用的是 Java,因此您可以使用本机 Java Producer API 来准确控制向 Kafka 生成数据的方式(这也更加高效和灵活比 REST API)。