如何在 KSQLDB 查询的行中使用 protobuf 反序列化?

How do I use protobuf deserialization in rows from a KSQLDB query?

我正在尝试在 Java 中标准化我的 KSQLDB 应用程序中的反序列化,但我正在努力了解如何处理 KSQLDB Client 类型返回的 Row 类型。例如(删除try/catches):

    import io.confluent.ksql.api.client.Client;
    import io.confluent.ksql.api.client.BatchedQueryResult;

    Client ksqldbClient = kafkaService.getKSQLDBClient();
    String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
    BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
    List<Row> rows = query.get();

我的 KSQLDB 表配置为使用 protobuf 序列化,但 Row 类型似乎是 JSON?我只能设法通过以下方式获取其数据:

    for (Row row : rows) {
        String json = row.asObject().toJsonString();
        // Deserialize json string
        ...
    }

KSQLDB 客户端是否仅自行处理 protobuf 反序列化?有没有办法只获取 protobuf 字节,这样我就可以将它传递到我已经定义的 Protobuf 解串器中,这样我就不需要再编写 JSON 解串器了?

row.asObject() returns 一个已经反序列化的 KsqlObject,其操作类似于 JDBC ResultSet,因为您可以针对行中的类型调用各种 get 方法。

如果你想映射到你从 Protobuf 生成的特定域对象,似乎没有直接的方法,你可能最好直接使用 Kafka Streams 而不是 KSQL 如果你需要那个功能