kafka-streams left 加入 produce NPE
kafka-strams leftJoin produce NPE
我在主题中有下一个值
// photos
1, {"id": 1, user_id: 1, "url": "http://example.com"}
2, {"id": 2, user_id: 2, "url": "http://example1.com"}
3, {"id": 3, user_id: 1, "url": "http://example2.com"}
// users
1, {"id": 1, "name": "user1"}
2, {"id": 2, "name": "user2"}
我想获取信息:[photo_id, photo_url, user_id, user_name]
我为它实现结果class
public class Result {
public int photo_id;
public String photo_url;
public int user_id;
public String user_name;
public static Result from(Photo photo, User user) {
Result r = new Result();
r.photo_id = photo.id;
r.photo_url = r.url;
r.user_id = user.id;
r.user_name = user.name;
return r;
}
}
我的流实现:
final KStream<Integer, Photo) photo_by_user = ...;
final KStream<Integer, User) users = ...;
users.leftJoin(photo_by_user, new ValueJoiner<User, Photo, Result> {
public Result apply(User user, Photo photo) {
return Result.from(user, photo);
}
}, JoinWindows.of(1L))
但是当我 运行 这段代码时,我得到:
Exception in thread "example-StreamThread-1" java.lang.NullPointerException
at myapps.util.Result.create(Result.java:15)
at myapps.Example.apply(Example.java:56)
at myapps.Example.apply(Example.java:53)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:87)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
因为User have a value
,但是Photo
是null
但我不明白为什么?以及如何避免它。
使用 leftJoin 您可以在 ValueJoiner
apply
方法中使用空照片值。
对于 users
流的每个不满足连接谓词的输入记录,将为 photo_by_user
流调用提供的 ValueJoiner 并使用空值。
在 Result.from()
方法中,您需要检查 Photo 实例是否为非空,然后才获取照片的字段 id 和 url。
您还离开了 JoinWindows.of(1L)
,其中 1 是毫秒。
请注意,两条记录只有在它们的时间戳彼此接近(如指定 JoinWindows
所定义的那样)时才被连接。在您的情况下,可能不完全同时创建两条记录,因此请尝试增加价值,例如10000L 来测试你的加入逻辑。
我在主题中有下一个值
// photos
1, {"id": 1, user_id: 1, "url": "http://example.com"}
2, {"id": 2, user_id: 2, "url": "http://example1.com"}
3, {"id": 3, user_id: 1, "url": "http://example2.com"}
// users
1, {"id": 1, "name": "user1"}
2, {"id": 2, "name": "user2"}
我想获取信息:[photo_id, photo_url, user_id, user_name]
我为它实现结果class
public class Result {
public int photo_id;
public String photo_url;
public int user_id;
public String user_name;
public static Result from(Photo photo, User user) {
Result r = new Result();
r.photo_id = photo.id;
r.photo_url = r.url;
r.user_id = user.id;
r.user_name = user.name;
return r;
}
}
我的流实现:
final KStream<Integer, Photo) photo_by_user = ...;
final KStream<Integer, User) users = ...;
users.leftJoin(photo_by_user, new ValueJoiner<User, Photo, Result> {
public Result apply(User user, Photo photo) {
return Result.from(user, photo);
}
}, JoinWindows.of(1L))
但是当我 运行 这段代码时,我得到:
Exception in thread "example-StreamThread-1" java.lang.NullPointerException
at myapps.util.Result.create(Result.java:15)
at myapps.Example.apply(Example.java:56)
at myapps.Example.apply(Example.java:53)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:87)
at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)
因为User have a value
,但是Photo
是null
但我不明白为什么?以及如何避免它。
使用 leftJoin 您可以在 ValueJoiner
apply
方法中使用空照片值。
对于 users
流的每个不满足连接谓词的输入记录,将为 photo_by_user
流调用提供的 ValueJoiner 并使用空值。
在 Result.from()
方法中,您需要检查 Photo 实例是否为非空,然后才获取照片的字段 id 和 url。
您还离开了 JoinWindows.of(1L)
,其中 1 是毫秒。
请注意,两条记录只有在它们的时间戳彼此接近(如指定 JoinWindows
所定义的那样)时才被连接。在您的情况下,可能不完全同时创建两条记录,因此请尝试增加价值,例如10000L 来测试你的加入逻辑。