Apache beam get kafka data execute SQL error:Cannot call getSchema when there is no schema

Apache beam get kafka data execute SQL error:Cannot call getSchema when there is no schema

我将多个表的数据输入到kafka,beam拿到数据后会执行SQL,但是现在出现如下错误:

线程异常 "main"

java.lang.IllegalStateException: Cannot call getSchema when there is no schema at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.(BeamPCollectionTable.java:34) at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:141) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:102) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:248) at BeamSqlTest.main(BeamSqlTest.java:65)

有没有可行的方案?请帮助我!

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.repackaged.sql.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;

class BeamSqlTest {
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
        options.setRunner(DirectRunner.class);
        Pipeline p = Pipeline.create(options);

        PCollection<KafkaRecord<String, String>> lines = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers("192.168.8.16")
                .withTopic("tmp_table.reuslt")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withConsumerConfigUpdates(ImmutableMap.of("group.id", "beam_app"))
                .withReadCommitted()
                .commitOffsetsInFinalize());

        PCollection<Row> apply = lines.apply(ParDo.of(new DoFn<KafkaRecord<String, String>,Row>(){
            @ProcessElement
            public void processElement(ProcessContext c) {
                String jsonData = c.element().getKV().getValue(); //data: {id:0001@int,name:test01@string,age:29@int,score:99@int}
                if(!"data_increment_heartbeat".equals(jsonData)){ //Filter out heartbeat information
                    JSONObject jsonObject = JSON.parseObject(jsonData);
                    Schema.Builder builder = Schema.builder();
                    //A data pipeline may have data from multiple tables so the Schema is obtained dynamically
                    //This assumes data from a single table
                    List<Object> list = new ArrayList<Object>();
                    for(String s : jsonObject.keySet()) {
                        String[] dataType = jsonObject.get(s).toString().split("@");   //data@field type
                        if(dataType[1].equals("int")){
                            builder.addInt32Field(s);
                        }else if(dataType[1].equals("string")){
                            builder.addStringField(s);
                        }
                        list.add(dataType[0]);
                    }
                    Schema schema = builder.build();
                    Row row = Row.withSchema(schema).addValues(list).build();
                    System.out.println(row);
                    c.output(row);
                }
            }
        }));

        PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("USER_TABLE"), apply)
                .apply(SqlTransform.query("SELECT COUNT(id) total_count, SUM(score) total_score FROM USER_TABLE GROUP BY id"));

        result.apply( "log_result", MapElements.via( new SimpleFunction<Row, Row>() {
            @Override
            public Row apply(Row input) {
                System.out.println("USER_TABLE result: " + input.getValues());
                return input;
            }
        }));`enter code here`

    }
}

我认为您需要使用 setRowSchema()setSchema() 为输入集合 PCollection<Row> apply 设置架构。问题是您的架构是动态的,并且是在运行时定义的(不确定 Beam 是否支持)。您可以在开始处理输入数据之前拥有静态模式并定义它吗?

此外,由于您的输入源是无界的,您需要定义windows以在SqlTransform之后应用。