一对多KStream-KTable连接
One to Many KStream-KTable join
我有一个 kStream 的大学 -
当大学是-
University(universityId: String, name: String, studentIds: Seq[String])
val universityKStream = builder.stream[String, University](...)
还有一张学生表,
当学生是 -
Student(studentId: String, name: String)
val studentsKtable = builder.table[String, Student](...)
我想加入两者并产生一个 ResolvedUniversity 对象的主题:
ResolvedUniversity(universityId: String, name: String, students: Seq[Student])
我无法使用 universityId groupBy 和聚合学生,因为 universityId 字段在 Student 对象中不存在。
仅使用 DSL,我认为您可以做的最简单的事情是 (Java):
class Student {
String studentId;
String name;
}
class University {
String universityId;
String name;
List<String> studentIds;
}
class ResolvedUniversity {
String universityId;
String name;
List<Student> students;
}
Serde<String> stringSerde = null;
Serde<Student> studentSerde = null;
Serde<University> universitySerde = null;
Serde<ResolvedUniversity> resolvedUniversitySerde = null;
KStream<String, University> universities = topology
.stream("universities", Consumed.with(stringSerde, universitySerde));
KTable<String, Student> students = topology
.table("students", Consumed.with(stringSerde, studentSerde));
KTable<String, ResolvedUniversity> resolvedUniversities = universities
.flatMap((k, v) -> {
return v.studentIds.stream()
.map(id -> new KeyValue<>(id, v))
.collect(Collectors.toList());
})
.join(students, Pair::pair, Joined.with(stringSerde, universitySerde, studentSerde))
.groupBy((k, v) -> v.left().universityId)
.aggregate(ResolvedUniversity::new,
(k, v, a) -> {
a.universityId = v.left().universityId;
a.name = v.left().name;
a.students.add(v.right());
return a;
},
Materialized.with(stringSerde, resolvedUniversitySerde));
使用这种类型的连接,为了进行历史处理,您的 KTable
所大学必须在 KStream
连接之前用它的数据“准备好”。
我有一个 kStream 的大学 - 当大学是-
University(universityId: String, name: String, studentIds: Seq[String])
val universityKStream = builder.stream[String, University](...)
还有一张学生表, 当学生是 -
Student(studentId: String, name: String)
val studentsKtable = builder.table[String, Student](...)
我想加入两者并产生一个 ResolvedUniversity 对象的主题:
ResolvedUniversity(universityId: String, name: String, students: Seq[Student])
我无法使用 universityId groupBy 和聚合学生,因为 universityId 字段在 Student 对象中不存在。
仅使用 DSL,我认为您可以做的最简单的事情是 (Java):
class Student {
String studentId;
String name;
}
class University {
String universityId;
String name;
List<String> studentIds;
}
class ResolvedUniversity {
String universityId;
String name;
List<Student> students;
}
Serde<String> stringSerde = null;
Serde<Student> studentSerde = null;
Serde<University> universitySerde = null;
Serde<ResolvedUniversity> resolvedUniversitySerde = null;
KStream<String, University> universities = topology
.stream("universities", Consumed.with(stringSerde, universitySerde));
KTable<String, Student> students = topology
.table("students", Consumed.with(stringSerde, studentSerde));
KTable<String, ResolvedUniversity> resolvedUniversities = universities
.flatMap((k, v) -> {
return v.studentIds.stream()
.map(id -> new KeyValue<>(id, v))
.collect(Collectors.toList());
})
.join(students, Pair::pair, Joined.with(stringSerde, universitySerde, studentSerde))
.groupBy((k, v) -> v.left().universityId)
.aggregate(ResolvedUniversity::new,
(k, v, a) -> {
a.universityId = v.left().universityId;
a.name = v.left().name;
a.students.add(v.right());
return a;
},
Materialized.with(stringSerde, resolvedUniversitySerde));
使用这种类型的连接,为了进行历史处理,您的 KTable
所大学必须在 KStream
连接之前用它的数据“准备好”。