Apache beam get kafka data execute SQL error:Cannot call getSchema when there is no schema
Apache beam get kafka data execute SQL error:Cannot call getSchema when there is no schema
我将多个表的数据输入到kafka,beam拿到数据后会执行SQL,但是现在出现如下错误:
线程异常 "main"
java.lang.IllegalStateException: Cannot call getSchema when there is
no schema at
org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
at
org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.(BeamPCollectionTable.java:34)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:141)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:102)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) at
org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:248)
at BeamSqlTest.main(BeamSqlTest.java:65)
有没有可行的方案?请帮助我!
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.repackaged.sql.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.ArrayList;
import java.util.List;
class BeamSqlTest {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<KafkaRecord<String, String>> lines = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("192.168.8.16")
.withTopic("tmp_table.reuslt")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "beam_app"))
.withReadCommitted()
.commitOffsetsInFinalize());
PCollection<Row> apply = lines.apply(ParDo.of(new DoFn<KafkaRecord<String, String>,Row>(){
@ProcessElement
public void processElement(ProcessContext c) {
String jsonData = c.element().getKV().getValue(); //data: {id:0001@int,name:test01@string,age:29@int,score:99@int}
if(!"data_increment_heartbeat".equals(jsonData)){ //Filter out heartbeat information
JSONObject jsonObject = JSON.parseObject(jsonData);
Schema.Builder builder = Schema.builder();
//A data pipeline may have data from multiple tables so the Schema is obtained dynamically
//This assumes data from a single table
List<Object> list = new ArrayList<Object>();
for(String s : jsonObject.keySet()) {
String[] dataType = jsonObject.get(s).toString().split("@"); //data@field type
if(dataType[1].equals("int")){
builder.addInt32Field(s);
}else if(dataType[1].equals("string")){
builder.addStringField(s);
}
list.add(dataType[0]);
}
Schema schema = builder.build();
Row row = Row.withSchema(schema).addValues(list).build();
System.out.println(row);
c.output(row);
}
}
}));
PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("USER_TABLE"), apply)
.apply(SqlTransform.query("SELECT COUNT(id) total_count, SUM(score) total_score FROM USER_TABLE GROUP BY id"));
result.apply( "log_result", MapElements.via( new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("USER_TABLE result: " + input.getValues());
return input;
}
}));`enter code here`
}
}
我认为您需要使用 setRowSchema()
或 setSchema()
为输入集合 PCollection<Row> apply
设置架构。问题是您的架构是动态的,并且是在运行时定义的(不确定 Beam 是否支持)。您可以在开始处理输入数据之前拥有静态模式并定义它吗?
此外,由于您的输入源是无界的,您需要定义windows以在SqlTransform
之后应用。
我将多个表的数据输入到kafka,beam拿到数据后会执行SQL,但是现在出现如下错误:
线程异常 "main"
java.lang.IllegalStateException: Cannot call getSchema when there is no schema at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.(BeamPCollectionTable.java:34) at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:141) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:102) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:248) at BeamSqlTest.main(BeamSqlTest.java:65)
有没有可行的方案?请帮助我!
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.repackaged.sql.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.ArrayList;
import java.util.List;
class BeamSqlTest {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<KafkaRecord<String, String>> lines = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("192.168.8.16")
.withTopic("tmp_table.reuslt")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "beam_app"))
.withReadCommitted()
.commitOffsetsInFinalize());
PCollection<Row> apply = lines.apply(ParDo.of(new DoFn<KafkaRecord<String, String>,Row>(){
@ProcessElement
public void processElement(ProcessContext c) {
String jsonData = c.element().getKV().getValue(); //data: {id:0001@int,name:test01@string,age:29@int,score:99@int}
if(!"data_increment_heartbeat".equals(jsonData)){ //Filter out heartbeat information
JSONObject jsonObject = JSON.parseObject(jsonData);
Schema.Builder builder = Schema.builder();
//A data pipeline may have data from multiple tables so the Schema is obtained dynamically
//This assumes data from a single table
List<Object> list = new ArrayList<Object>();
for(String s : jsonObject.keySet()) {
String[] dataType = jsonObject.get(s).toString().split("@"); //data@field type
if(dataType[1].equals("int")){
builder.addInt32Field(s);
}else if(dataType[1].equals("string")){
builder.addStringField(s);
}
list.add(dataType[0]);
}
Schema schema = builder.build();
Row row = Row.withSchema(schema).addValues(list).build();
System.out.println(row);
c.output(row);
}
}
}));
PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("USER_TABLE"), apply)
.apply(SqlTransform.query("SELECT COUNT(id) total_count, SUM(score) total_score FROM USER_TABLE GROUP BY id"));
result.apply( "log_result", MapElements.via( new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("USER_TABLE result: " + input.getValues());
return input;
}
}));`enter code here`
}
}
我认为您需要使用 setRowSchema()
或 setSchema()
为输入集合 PCollection<Row> apply
设置架构。问题是您的架构是动态的,并且是在运行时定义的(不确定 Beam 是否支持)。您可以在开始处理输入数据之前拥有静态模式并定义它吗?
此外,由于您的输入源是无界的,您需要定义windows以在SqlTransform
之后应用。