kafka-streams left 加入 produce NPE

kafka-strams leftJoin produce NPE

我在主题中有下一个值

// photos
1, {"id": 1, user_id: 1, "url": "http://example.com"}
2, {"id": 2, user_id: 2, "url": "http://example1.com"}
3, {"id": 3, user_id: 1, "url": "http://example2.com"}

// users
1, {"id": 1, "name": "user1"}
2, {"id": 2, "name": "user2"}

我想获取信息:[photo_id, photo_url, user_id, user_name]

我为它实现结果class

 public class Result {
  public int photo_id;
  public String photo_url;
  public int user_id;
  public String user_name;

  public static Result from(Photo photo, User user) {
    Result r = new Result();
    r.photo_id = photo.id;
    r.photo_url = r.url;
    r.user_id = user.id;
    r.user_name = user.name;
    return r;
  }
 }

我的流实现:

final KStream<Integer, Photo) photo_by_user = ...;
final KStream<Integer, User) users = ...;

users.leftJoin(photo_by_user, new ValueJoiner<User, Photo, Result> {
  public Result apply(User user, Photo photo) {
    return Result.from(user, photo);
  }
}, JoinWindows.of(1L))

但是当我 运行 这段代码时,我得到:

    Exception in thread "example-StreamThread-1" java.lang.NullPointerException
  at myapps.util.Result.create(Result.java:15)
  at myapps.Example.apply(Example.java:56)
  at myapps.Example.apply(Example.java:53)
  at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:87)
  at org.apache.kafka.streams.processor.internals.ProcessorNode.run(ProcessorNode.java:46)

因为User have a value,但是Photonull

但我不明白为什么?以及如何避免它。

使用 leftJoin 您可以在 ValueJoiner apply 方法中使用空照片值。

对于 users 流的每个不满足连接谓词的输入记录,将为 photo_by_user 流调用提供的 ValueJoiner 并使用空值。

Result.from() 方法中,您需要检查 Photo 实例是否为非空,然后才获取照片的字段 id 和 url。

您还离开了 JoinWindows.of(1L),其中 1 是毫秒。 请注意,两条记录只有在它们的时间戳彼此接近(如指定 JoinWindows 所定义的那样)时才被连接。在您的情况下,可能不完全同时创建两条记录,因此请尝试增加价值,例如10000L 来测试你的加入逻辑。