如何在嵌入式 Flink Statefun 模块中路由自定义对象?

How to route custom objects in an embedded Flink Statefun module?

我在 Apache Flink Statefun 3.0(自定义 Greeter 示例)中有一个 嵌入式 模块,它使用 JSON 序列化事件。当尝试从入口反序列化 route() 消息时,我得到一个异常,即我的自定义类型不能转换为 protobuf(是的,它不是)——但是 应该 它是?我的意思是我尝试使用 3.x 文档,但没有发现任何关于要路由的类型的限制。

对此有任何提示或指示吗?

谢谢

// The custom type (Bean-style and all)
public final class Message {
  @JsonProperty private String name;
  @JsonProperty private String id;
  @JsonProperty private int visits;
  public Message() {}
  public String getName() { return name; }
  public void setName(String s) { name = s; }
  public String getId() { return id; }
  public void setId(String s) { id = s; }
  public int getVisits() { return visits; }
  public void setVisits(int i) { visits = i; }
}

// The function
public class GreeterFn implements StatefulFunction {
    public static final FunctionType TYPE = new FunctionType("example", "greeter");
    @Override
    public void invoke(Context ctx, Object msg) {
        // I never get here
    }
}

// The module
public class EmbeddedModule implements StatefulFunctionModule {
    static final IngressIdentifier<Message> INGRESS = new IngressIdentifier<>(Message.class, "example", "names");

    private static final class MsgDeser implements KafkaIngressDeserializer<Message> {
        private final ObjectMapper mapper = new ObjectMapper();
        @Override
        public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
            try { return mapper.readValue(new String(input.value(), StandardCharsets.UTF_8), Message.class); }
            catch (java.io.IOException e) { e.printStackTrace(); }
            return null; 
        }
    }

    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
            .withKafkaAddress("kafka:9092")
            .withTopic("names")
            .withDeserializer(MsgDeser.class)
            .withConsumerGroupId("my-group-id")
            .build());
        binder.bindIngressRouter(INGRESS, new Router<Message>() {
            @Override
            public void route(Message m, Downstream<Message> ds) {
                ds.forward(GreeterFn.TYPE, m.getName(), m); // <-- I get here OK but then the exception
            }
        });
        binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
    }
}

// And the logs (trimmed)
statefun-worker_1   | 2021-07-12 11:29:33,366 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: example-names-ingress -> router (names) (1/1)#0 (2b43e45ce4bcc61340ff131d147f3afe) switched from RUNNING to FAILED.         
statefun-worker_1   | java.lang.RuntimeException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')                                                                                                                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                      
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                  
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.1.jar:1.12.1]                                                                                                       
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator$DownstreamCollector.forward(IngressRouterOperator.java:127) ~[statefun-flink-core.jar:3.0.0]                                                                
statefun-worker_1   |   at org.apache.flink.statefun.sdk.io.Router$Downstream.forward(Router.java:67) ~[statefun-flink-distribution.jar:3.0.0]                                                                                                                
statefun-worker_1   |   at com.my.flink.EmbeddedModule.route(EmbeddedModule.java:47) ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at com.my.flink.EmbeddedModule.route(EmbeddedModule.java:43) ~[myflink-1.jar:?]                                                                                                                                                 
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement(IngressRouterOperator.java:81) ~[statefun-flink-core.jar:3.0.0]                                                                              
...
statefun-worker_1   | Caused by: java.lang.ClassCastException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')                                                                                                                                                                                     
statefun-worker_1   |   at org.apache.flink.statefun.flink.core.message.MessagePayloadSerializerPb.serialize(MessagePayloadSerializerPb.java:50) ~[statefun-flink-core.jar:3.0.0]                                                                             
...

默认情况下,嵌入式函数 sent/received 的消息被假定为 Protobuf。您可以通过设置 flink-conf.yaml:

中的以下键来使用 Kryo(甚至是自定义序列化程序)

statefun.message.serializer: WITH_KRYO_PAYLOADS

这不是真正的推荐,因为随着时间的推移很难改进您的应用程序。

您仍然可以坚持使用 Protobuf,方法是使用名为 StringValue 的内置 Protobuf 类型推迟字符串 -> 消息反序列化。

我已采用您粘贴的代码来使用 StringValue:

    public class EmbeddedModule implements StatefulFunctionModule {
    static final IngressIdentifier<StringValue> INGRESS = new IngressIdentifier<>(StringValue.class, "example", "names");


    private static final class MsgDeser implements KafkaIngressDeserializer<StringValue> {
        private final ObjectMapper mapper = new ObjectMapper();
        @Override
        public StringValue deserialize(ConsumerRecord<byte[], byte[]> input) {
            String json = new String(input.value(), StandardCharsets.UTF_8);
            return StringValue.of(json);
        }
    }

    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
            .withKafkaAddress("kafka:9092")
            .withTopic("names")
            .withDeserializer(MsgDeser.class)
            .withConsumerGroupId("my-group-id")
            .build());
        binder.bindIngressRouter(INGRESS, new Router<StringValue>() {
            @Override
            public void route(StringValue m, Downstream<StringValue> ds) {
                String json = StringValue.getValue();
                String name = ... ; // extract the name from this JSON
                ds.forward(GreeterFn.TYPE, name , m);
            }
        });
        binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
    }
   }

将您的消息定义为 Portobuf 消息

避免双重反序列化(在路由器和函数中) 您可以定义以下 Protobuf 消息:

message MyMessage {
 string name = 1;
 string id = 2;
 int visits = 3;
}

然后将 json 字符串转换为 MyMessage 的实例:

MyMessage.Builder builder = MyMessage.newBuilder();
JsonFormat.parser().merge(jsonString, builder);
MyMessage myMessage = builder.build();