如何在 KSQLDB 查询的行中使用 protobuf 反序列化?
How do I use protobuf deserialization in rows from a KSQLDB query?
我正在尝试在 Java 中标准化我的 KSQLDB 应用程序中的反序列化,但我正在努力了解如何处理 KSQLDB Client
类型返回的 Row
类型。例如(删除try/catches):
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.BatchedQueryResult;
Client ksqldbClient = kafkaService.getKSQLDBClient();
String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
List<Row> rows = query.get();
我的 KSQLDB 表配置为使用 protobuf 序列化,但 Row
类型似乎是 JSON?我只能设法通过以下方式获取其数据:
for (Row row : rows) {
String json = row.asObject().toJsonString();
// Deserialize json string
...
}
KSQLDB 客户端是否仅自行处理 protobuf 反序列化?有没有办法只获取 protobuf 字节,这样我就可以将它传递到我已经定义的 Protobuf 解串器中,这样我就不需要再编写 JSON 解串器了?
row.asObject()
returns 一个已经反序列化的 KsqlObject,其操作类似于 JDBC ResultSet,因为您可以针对行中的类型调用各种 get 方法。
如果你想映射到你从 Protobuf 生成的特定域对象,似乎没有直接的方法,你可能最好直接使用 Kafka Streams 而不是 KSQL 如果你需要那个功能
我正在尝试在 Java 中标准化我的 KSQLDB 应用程序中的反序列化,但我正在努力了解如何处理 KSQLDB Client
类型返回的 Row
类型。例如(删除try/catches):
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.BatchedQueryResult;
Client ksqldbClient = kafkaService.getKSQLDBClient();
String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
List<Row> rows = query.get();
我的 KSQLDB 表配置为使用 protobuf 序列化,但 Row
类型似乎是 JSON?我只能设法通过以下方式获取其数据:
for (Row row : rows) {
String json = row.asObject().toJsonString();
// Deserialize json string
...
}
KSQLDB 客户端是否仅自行处理 protobuf 反序列化?有没有办法只获取 protobuf 字节,这样我就可以将它传递到我已经定义的 Protobuf 解串器中,这样我就不需要再编写 JSON 解串器了?
row.asObject()
returns 一个已经反序列化的 KsqlObject,其操作类似于 JDBC ResultSet,因为您可以针对行中的类型调用各种 get 方法。
如果你想映射到你从 Protobuf 生成的特定域对象,似乎没有直接的方法,你可能最好直接使用 Kafka Streams 而不是 KSQL 如果你需要那个功能