使用 KStream groupBy 操作进行序列化

Serialization with KStream groupBy operation

我正在尝试对 KStream 执行计数操作,运行 在理解序列化在这里的工作方式时遇到了一些困难。我有一个正在推送人们信息的流,例如姓名年龄。使用此流后,我正在尝试创建一个包含人们年龄的 KTable。

输入: {"name":"abc","age":“15”}

输出: 30, 10 20, 4 10, 8 35, 22 ...

属性

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "person_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

处理器

KStream<Object, Person> people = builder.stream("people");
people.print(Printed.<Object, Person>toSysOut().withLabel("consumer-1"));

输出 [消费者 1]:空,[B@7e37bab6

问题一 我了解主题中的数据以字节为单位。我没有为 Key 或 Value 设置任何 Serdes。 KStream 是否将输入从 bytes 转换为 Person 并在此处打印 Person 的地址?

问题2 当我添加以下值 Serdes 时,我得到了更有意义的输出。这里的字节信息是先转成String再转Person吗?为什么现在正确打印了值?

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

[consumer-1]: null, {"name" : "abc","age" : "15"}

问题3 现在,在对年龄进行计数时,我在将 String 转换为 Person 时遇到运行时错误。如果 groupBy 将年龄设置为键并将计数设置为长,为什么会发生字符串到人的转换?

KTable<Integer, Long> integerLongKTable = people.groupBy((key, value) -> value.getAge())
    .count();

Exception in thread "person_processor-9ff96b38-4beb-4594-b2fe-ae191bf6b9ff-StreamThread-1" java.lang.ClassCastException: java.lang.String cannot be cast to com.example.kafkastreams.KafkaStreamsApplication$Person
at org.apache.kafka.streams.kstream.internals.KStreamImpl.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.apply(KStreamImpl.java:149)

编辑-1

阅读@Matthias J. Sax 的回复后,我使用此位置的序列化器和反序列化器创建了一个 PersonSerde,我得到了这个 SerializationException...

https://github.com/apache/kafka/tree/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

static class Person {

    String name;
    String age;

    public Person(String name, String age) {

      this.name = name;
      this.age = age;
    }

    void setName(String name) {

      this.name = name;
    }

    String getName() {

      return name;
    }

    void setAge(String age) {

      this.age = age;
    }

    String getAge() {

      return age;
    }

    @Override
    public String toString() {

      return "Person {name:" + this.getName() + ",age:" + this.getAge() + "}";
    }
  }

public class PersonSerde implements Serde {

  @Override
  public void configure(Map map, boolean b) {

  }

  @Override
  public void close() {

  }

  @Override
  public Serializer serializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Serializer<Person> personSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personSerializer.configure(serdeProps, false);

    return personSerializer;
  }

  @Override
  public Deserializer deserializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Deserializer<Person> personDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personDeserializer.configure(serdeProps, false);

    return personDeserializer;
  }
}

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, personSerde.getClass());

KTable<String, Long> count = people.selectKey((key, value) -> value.getAge()).groupByKey(Serialized.with(Serdes.String(), personSerde))
      .count();

错误

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.example.kafkastreams.KafkaStreamsApplication$Person and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:313)

编辑 5

看来,当我将 mapValues 映射到字符串时,count 可以正常工作。但是当我在自定义对象上使用它时,它失败了

KStream<String, Person> people = builder.stream("person-topic", Consumed.with(Serdes.String(), personSerde));
people.print(Printed.<String, Person>toSysOut().withLabel("person-source"));

KStream<String, Person> agePersonKStream = people.selectKey((key, value) -> value.getAge());
agePersonKStream.print(Printed.<String, Person>toSysOut().withLabel("age-person"));

KStream<String, String> stringStringKStream = agePersonKStream.mapValues((person -> person.name));
stringStringKStream.print(Printed.<String, String>toSysOut().withLabel("age-name"));

KTable<String, Long> stringLongKTable = stringStringKStream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count();
stringLongKTable.toStream().print(Printed.<String, Long>toSysOut().withLabel("age-count"));

没有第 3 步将值映射到名称,第 4 步失败。

Question-1 I understand that data in the topic is in bytes. I am not setting any Serdes for Key or Value to start with. Is KStream converting the input from bytes to Person and printing the address of Person here?

如果您没有在 StreamsConfigbuilder.stream(..., Consumers.with(/*serdes*/)) 中指定任何 Serde,字节将不会被转换为 Person 对象,但对象将是类型 byte[]。因此,print() 将调用 byte[].toString(),这会导致您看到的神秘输出 ([B@7e37bab6)。

Question-2 When I add the below value Serdes, I get a more meaningful output. Is the byte information here getting converted to String and then to Person? Why is the value now printed correctly?

当您在 StreamsConfig 中指定 Serde.String() 时,字节将转换为 String 类型。 StringSerde 似乎能够以一种有意义的方式反序列化字节——但这似乎只是一个巧合,它能正常工作。看来您的数据实际上是在 JSON 中序列化的,这可以解释为什么 StringSerde() 可以将字节转换为 String

Question-3 Now, when performing the count on the age, I get a runtime error on converting a String to Person. If groupBy is setting the age as the Key and the count as Long, why is the String to Person conversion happening?

这是意料之中的。因为字节被转换为 String 对象(如您指定的 Serdes.String()),所以无法执行转换。

最后备注:

如果您只使用 print(),您不会得到 class 转换异常,因为在这种情况下,不会执行任何转换操作。 Java 仅在需要时插入强制转换操作。

对于 groupBy() 你使用 value.getAge() 因此 Java 在这里插入一个转换(它知道期望的类型是 Person,因为它是通过 KStream<Object, Person> people = ...。对于 print(),仅调用在 Object 上定义的 toString(),因此不需要强制转换。

Java 中的泛型为编译器提供类型提示,并替换为 Object(或在编译期间根据需要强制转换)。因此,对于 print(),一个 Object 变量可以毫无问题地指向一个 byte[],并且 toString() 被成功调用。对于 groupBy() 的情况,编译器将 Object 转换为 Person 以便能够调用 getAge() —— 但是,这失败了,因为实际类型是 String.

为了让您的代码正常工作,您需要创建一个 PersonSerde extend Serde<Person> class 并将其指定为值 serde。