Spring Cloud Kafka:当两个处理器处于活动状态时无法序列化输出流的数据
Spring Cloud Kafka: Can't serialize data for output stream when two processors are active
我有一个 Spring Cloud Kafka Streams 的工作设置,具有函数式编程风格。
有两个用例,通过 application.properties
配置。
它们都单独工作,但是当我同时激活它们时,第二个用例的输出流出现序列化错误:
Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity
这里的最后一行很重要,因为“声明的根类型”来自 Material
class,而不是 Activity
class,这可能是来源错误。
同样,当我在启动应用程序之前只激活第二个用例时,一切正常。所以我假设“Material”处理器以某种方式干扰了“Activities”处理器(或其序列化程序),但我不知道何时何地。
设置
1.) 用例:“Materials”
- 一个输入流 -> 转换 -> 一个输出流
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials
2.) 用例:“活动”
- 两个输入流 -> 加入 -> 一个输出流
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities
这两个处理器在application.properties
中也定义为流函数:spring.cloud.stream.function.definition=processActivities;processMaterials
谢谢!
更新 - 这是我在代码中使用处理器的方式:
实施
// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
private String id;
private String name;
}
// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
// some transformation
final var newId = materialRaw.getId() + "---foo";
final var newName = materialRaw.getName() + "---bar";
final var material = new Material(newId, newName);
// output
return new KeyValue<>(recordKey, material);
};
}
// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
private String id;
private String assignedAt;
}
/**
* Combination of `ActivityRaw` and `Assignee`
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
private String id;
private Integer number;
private String assignedAt;
}
// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
return (activitiesRawStream, assigneesStream) -> {
final var joinWindow = JoinWindows.of(Duration.ofDays(30));
final var streamJoined = StreamJoined.with(
Serdes.String(),
new JsonSerde<>(ActivityRaw.class),
new JsonSerde<>(Assignee.class)
);
final var joinedStream = activitiesRawStream.leftJoin(
assigneesStream,
new ActivityJoiner(),
joinWindow,
streamJoined
);
final var mappedStream = joinedStream.map((recordKey, activity) -> {
return new KeyValue<>(recordKey, activity);
});
return mappedStream;
};
}
您需要为每个函数指定使用哪个活页夹s.c.s.bindings.xxx.binder=...
。
但是,如果没有它,我会预料到会出现诸如“找到多个活页夹但未指定默认值”之类的错误,这就是消息通道活页夹所发生的情况。
事实证明,当存在多个具有不同出站目标类型的函数时,活页夹推断 Serde
类型的方式存在问题,一个具有 Activity
,另一个具有 Material
在你的情况下。我们将不得不在活页夹中解决这个问题。我创建了一个问题 here。
在此期间,您可以遵循此解决方法。
创建自定义 Serde
class,如下所示。
public class ActivitySerde extends JsonSerde<Activity> {}
然后,使用配置将此 Serde
明确用于 processActivities
函数的出站。
例如,
spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde
如果您正在尝试此解决方法,请将软件包更改为合适的软件包。
这是另一种推荐的方法。如果您使用目标类型定义类型为 Serde
的 bean,则该 bean 优先,因为绑定器将与 KStream
类型进行匹配。因此,您也可以在上述解决方法中不定义额外的 class。
@Bean
public Serde<Activity> activitySerde() {
return new JsonSerde(Activity.class);
}
这里是docs,其中解释了所有这些细节。
我有一个 Spring Cloud Kafka Streams 的工作设置,具有函数式编程风格。
有两个用例,通过 application.properties
配置。
它们都单独工作,但是当我同时激活它们时,第二个用例的输出流出现序列化错误:
Exception in thread "ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1" org.apache.kafka.streams.errors.StreamsException:
Error encountered sending record to topic outputActivities for task 0_0 due to:
...
Caused by: org.apache.kafka.common.errors.SerializationException:
Can't serialize data [com.example.connector.model.Activity@497b37ff] for topic [outputActivities]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
Incompatible types: declared root type ([simple type, class com.example.connector.model.Material]) vs com.example.connector.model.Activity
这里的最后一行很重要,因为“声明的根类型”来自 Material
class,而不是 Activity
class,这可能是来源错误。
同样,当我在启动应用程序之前只激活第二个用例时,一切正常。所以我假设“Material”处理器以某种方式干扰了“Activities”处理器(或其序列化程序),但我不知道何时何地。
设置
1.) 用例:“Materials”
- 一个输入流 -> 转换 -> 一个输出流
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processMaterials.applicationId=MaterialsAppId
spring.cloud.stream.bindings.processMaterials-in-0.destination=inputMaterialsRaw
spring.cloud.stream.bindings.processMaterials-out-0.destination=outputMaterials
2.) 用例:“活动”
- 两个输入流 -> 加入 -> 一个输出流
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {...}
application.properties
spring.cloud.stream.kafka.streams.binder.functions.processActivities.applicationId=ActivitiesAppId
spring.cloud.stream.bindings.processActivities-in-0.destination=inputActivitiesRaw
spring.cloud.stream.bindings.processActivities-in-1.destination=inputAssignees
spring.cloud.stream.bindings.processActivities-out-0.destination=outputActivities
这两个处理器在application.properties
中也定义为流函数:spring.cloud.stream.function.definition=processActivities;processMaterials
谢谢!
更新 - 这是我在代码中使用处理器的方式:
实施
// Material model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class MaterialRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Material {
private String id;
private String name;
}
// Material processor
@Bean
public Function<KStream<String, MaterialRaw>, KStream<String, Material>> processMaterials() {
return materialsRawStream -> materialsRawStream .map((recordKey, materialRaw) -> {
// some transformation
final var newId = materialRaw.getId() + "---foo";
final var newName = materialRaw.getName() + "---bar";
final var material = new Material(newId, newName);
// output
return new KeyValue<>(recordKey, material);
};
}
// Activity model
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ActivityRaw {
private String id;
private String name;
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Assignee {
private String id;
private String assignedAt;
}
/**
* Combination of `ActivityRaw` and `Assignee`
*/
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Activity {
private String id;
private Integer number;
private String assignedAt;
}
// Activity processor
@Bean
public BiFunction<KStream<String, ActivityRaw>, KStream<String, Assignee>, KStream<String, Activity>> processActivities() {
return (activitiesRawStream, assigneesStream) -> {
final var joinWindow = JoinWindows.of(Duration.ofDays(30));
final var streamJoined = StreamJoined.with(
Serdes.String(),
new JsonSerde<>(ActivityRaw.class),
new JsonSerde<>(Assignee.class)
);
final var joinedStream = activitiesRawStream.leftJoin(
assigneesStream,
new ActivityJoiner(),
joinWindow,
streamJoined
);
final var mappedStream = joinedStream.map((recordKey, activity) -> {
return new KeyValue<>(recordKey, activity);
});
return mappedStream;
};
}
您需要为每个函数指定使用哪个活页夹s.c.s.bindings.xxx.binder=...
。
但是,如果没有它,我会预料到会出现诸如“找到多个活页夹但未指定默认值”之类的错误,这就是消息通道活页夹所发生的情况。
事实证明,当存在多个具有不同出站目标类型的函数时,活页夹推断 Serde
类型的方式存在问题,一个具有 Activity
,另一个具有 Material
在你的情况下。我们将不得不在活页夹中解决这个问题。我创建了一个问题 here。
在此期间,您可以遵循此解决方法。
创建自定义 Serde
class,如下所示。
public class ActivitySerde extends JsonSerde<Activity> {}
然后,使用配置将此 Serde
明确用于 processActivities
函数的出站。
例如,
spring.cloud.stream.kafka.streams.bindings.processActivities-out-0.producer.valueSerde=com.example.so65003575.ActivitySerde
如果您正在尝试此解决方法,请将软件包更改为合适的软件包。
这是另一种推荐的方法。如果您使用目标类型定义类型为 Serde
的 bean,则该 bean 优先,因为绑定器将与 KStream
类型进行匹配。因此,您也可以在上述解决方法中不定义额外的 class。
@Bean
public Serde<Activity> activitySerde() {
return new JsonSerde(Activity.class);
}
这里是docs,其中解释了所有这些细节。