如何在将数据发送到下游之前对数据进行 modify/update

How to modify/update to the data before sending it to downstream

我有一个主题,其数据格式为

{
 before: {...},
 after: {...},
 source: {...},
 op: 'u'
}

数据由 Debezium 制作。我想将数据发送到 SQL 服务器数据库 table,所以我 selected JDBC 接收器连接器。我需要在将数据发送到下游之前对其进行处理。

需要应用的逻辑:

  1. if op = 'u' or op = 'c' or op = 'r' // update or insert or snapshot

    select 'after' 中存在的所有字段并向下游执行更新插入。

  2. if op = 'd' // 删除

    select 'before' 中存在的所有字段 + 添加一个字段 IsActive=false 并向下游执行更新插入。

我怎样才能做到这一点?

如果您不强制接收到 kafka 主题的复杂 debezium 消息,请检查 Debezium's New Record State Extraction SMT。您需要在 Debezium 的连接器配置中配置它,如果您将它与 delete.handling.mode:rewrite 一起使用,您将在消息中得到一个字段 __deleted,该字段将用于字段 IsActive 你在问题中已经指出了。

您将接收到 kafka 的消息的简化格式将匹配 jbdc sink connector 期望的消息格式,尽管您可能只需要将 Single Message Transforms for Confluent Platform 中的一些应用到 jdbc sink connector的配置,以便过滤一些字段,替换一些字段等

附带的好处是,您还可以从 kafka 获得更少的数据。

我能够使用接收器 jdbc 连接器中的自定义转换来实现此目的。 我提取了 after 字段和 op 字段并应用了逻辑。没有直接的方法来更新记录,即没有 setSchema 和 setValue 的方法。所以我使用反射来更新架构和值。

以下代码片段:

private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> operationDelegate = new ExtractField.Value<R>(); 

public R apply(R record) {
        R operationRecord = operationDelegate.apply(record);
        String op = String.valueOf(operationRecord.value());
        Boolean isDeletedRecord = op.equalsIgnoreCase(Operation.DELETE.getValue())? true: false;
       ...
       finalRecord = afterDelegate.apply(record);
       if(isDeletedRecord){
            addDeletedFlag(finalRecord);
        }
} 
private void addDeletedFlag(R finalRecord){
        final SchemaBuilder builder = SchemaBuilder.struct();
        builder.name(finalRecord.valueSchema().name());
        for(Field f: finalRecord.valueSchema().fields()){
            builder.field(f.name(),f.schema());
        }
        builder.field(deleteFlagName,Schema.BOOLEAN_SCHEMA).optional();
        Schema newValueSchema = builder.build();
        try{
            java.lang.reflect.Field s = finalRecord.getClass().getSuperclass().getDeclaredField("valueSchema");
            s.setAccessible(true);
            s.set(finalRecord,newValueSchema);
        }catch (Exception e){
            e.printStackTrace();
        }

        Struct s = (Struct) finalRecord.value();
        updateValueSchema(s,finalRecord.valueSchema());
        updateValue(finalRecord.value(),true);
    }
private void updateValueSchema(Object o,Schema newSchema) {
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;
        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("schema");
            s.setAccessible(true);
            s.set(value,newSchema);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private void updateValue(Object o, Object newValue){
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;

        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("values");
            s.setAccessible(true);
            Object[] newValueArray = ((Object[]) s.get(value)).clone();
            List<Object> newValueList = new ArrayList<>(Arrays.asList(newValueArray));
            newValueList.add(newValue);
            s.set(value, newValueList.toArray());
        }catch (Exception e){
            e.printStackTrace();
        }
    }