无法使用 Flink siddhi 库处理 kafka json 消息
Not able to process kafka json message with Flink siddhi library
我正在尝试创建一个简单的应用程序,该应用程序将使用 Kafka 消息进行一些 cql 转换并发布到 Kafka,下面是代码:
JAVA: 1.8
弗林克:1.13
斯卡拉:2.11
flink-siddhi:2.11-0.2.2-SNAPSHOT
我正在使用图书馆:https://github.com/haoch/flink-siddhi
输入json到卡夫卡:
{
"awsS3":{
"ResourceType":"aws.S3",
"Details":{
"Name":"crossplane-test",
"CreationDate":"2020-08-17T11:28:05+00:00"
},
"AccessBlock":{
"PublicAccessBlockConfiguration":{
"BlockPublicAcls":true,
"IgnorePublicAcls":true,
"BlockPublicPolicy":true,
"RestrictPublicBuckets":true
}
},
"Location":{
"LocationConstraint":"us-west-2"
}
}
}
主要class:
public class S3SidhiApp {
public static void main(String[] args) {
internalStreamSiddhiApp.start();
//kafkaStreamApp.start();
}
}
应用class:
package flinksidhi.app;
import com.google.gson.JsonObject;
import flinksidhi.event.s3.source.S3EventSource;
import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.json.JSONObject;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static flinksidhi.app.connector.Consumers.createInputMessageConsumer;
import static flinksidhi.app.connector.Producer.*;
public class internalStreamSiddhiApp {
private static final String inputTopic = "EVENT_STREAM_INPUT";
private static final String outputTopic = "EVENT_STREAM_OUTPUT";
private static final String consumerGroup = "EVENT_STREAM1";
private static final String kafkaAddress = "localhost:9092";
private static final String zkAddress = "localhost:2181";
private static final String S3_CQL1 = "from inputStream select * insert into temp";
private static final String S3_CQL = "from inputStream select json:toObject(awsS3) as obj insert into temp;" +
"from temp select json:getString(obj,'$.awsS3.ResourceType') as affected_resource_type," +
"json:getString(obj,'$.awsS3.Details.Name') as affected_resource_name," +
"json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration') as encryption," +
"json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm') as algorithm insert into temp2; " +
"from temp2 select affected_resource_name,affected_resource_type, " +
"ifThenElse(encryption == ' ','Fail','Pass') as state," +
"ifThenElse(encryption != ' ' and algorithm == 'aws:kms','None','Critical') as severity insert into outputStream";
public static void start(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataStream<String> inputS = env.addSource(new S3EventSource());
//Flink kafka stream consumer
FlinkKafkaConsumer<String> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress,zkAddress, consumerGroup);
//Add Data stream source -- flink consumer
DataStream<String> inputS = env.addSource(flinkKafkaConsumer);
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerExtension("json:toObject", io.siddhi.extension.execution.json.function.ToJSONObjectFunctionExtension.class);
cep.registerExtension( "json:getString", io.siddhi.extension.execution.json.function.GetStringJSONFunctionExtension.class);
cep.registerStream("inputStream", inputS, "awsS3");
inputS.print();
System.out.println(cep.getDataStreamSchemas());
//json needs extension jars to present during runtime.
DataStream<Map<String,Object>> output = cep
.from("inputStream")
.cql(S3_CQL1)
.returnAsMap("temp");
//Flink kafka stream Producer
FlinkKafkaProducer<Map<String, Object>> flinkKafkaProducer =
createMapProducer(env,outputTopic, kafkaAddress);
//Add Data stream sink -- flink producer
output.addSink(flinkKafkaProducer);
output.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者class:
package flinksidhi.app.connector;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.json.JSONObject;
import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer<String> createInputMessageConsumer(String topic, String kafkaAddress, String zookeeprAddr, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("zookeeper.connect", zookeeprAddr);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
topic,new SimpleStringSchema(),properties);
return consumer;
}
}
制作人class:
package flinksidhi.app.connector;
import flinksidhi.app.util.ConvertJavaMapToJson;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.json.JSONObject;
import java.util.Map;
public class Producer {
public static FlinkKafkaProducer<Tuple2> createStringProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {
return new FlinkKafkaProducer<Tuple2>(kafkaAddress, topic, new AverageSerializer());
}
public static FlinkKafkaProducer<Map<String,Object>> createMapProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {
return new FlinkKafkaProducer<Map<String,Object>>(kafkaAddress, topic, new SerializationSchema<Map<String, Object>>() {
@Override
public void open(InitializationContext context) throws Exception {
}
@Override
public byte[] serialize(Map<String, Object> stringObjectMap) {
String json = ConvertJavaMapToJson.convert(stringObjectMap);
return json.getBytes();
}
});
}
}
我已经尝试了很多东西,但从未调用过调用 CQL 的代码,甚至没有给出任何错误,不知道哪里出错了。
同样的事情,如果我创建一个内部流源并使用相同的输入 json 到 return 作为字符串。
初步猜测:如果你使用的是事件时间,你确定你定义的水印是正确的吗?如 docs 中所述:
(...) an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed (...)
如果这没有帮助,我会建议 decompose/simplify 将作业降至最低限度,例如,只有一个源运算符和一些简单的接收器 printing/logging 元素。如果可行,开始一个接一个地向后添加运算符。您也可以从尽可能简化 CEP 模式开始。
首先非常感谢@Piotr Nowojski,只是因为你的小提示,无论我对事件时间思考了多少次,我都没有想到。所以是的,在调试这两种情况时:
- 使用内部数据源,它在其中成功处理,在调试流程时,我发现它在处理数据后正在处理水印,但我没有发现它以某种方式管理事件时间数据隐含。
- 使用 kafka 作为数据源,在调试时我可以非常清楚地看到它没有处理流中的任何水印,但我没有想到,它是由于事件时间和水印而发生的处理不当。
只需在我从下面理解的应用程序代码中添加一行代码 Flink 代码片段:
@deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
* TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
* event-time support anymore. Explicitly using processing-time windows and timers works in
* event-time mode. If you need to disable watermarks, please use {@link
* ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
* TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
* WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
* org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
* that change behaviour based on the time characteristic, please use equivalent operations
* that explicitly specify processing time or event time.
*/
我知道默认情况下 flink 会考虑事件时间,因此需要正确处理水印,而我没有,所以我在下面添加了 link 用于设置时间特征flink执行环境:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
和kaboom ...它开始工作了,虽然这已被弃用并且需要一些其他配置,但非常感谢,它是一个很好的指导,对我帮助很大,我解决了这个问题..
再次感谢@Piotr Nowojski
我正在尝试创建一个简单的应用程序,该应用程序将使用 Kafka 消息进行一些 cql 转换并发布到 Kafka,下面是代码:
JAVA: 1.8 弗林克:1.13 斯卡拉:2.11 flink-siddhi:2.11-0.2.2-SNAPSHOT
我正在使用图书馆:https://github.com/haoch/flink-siddhi
输入json到卡夫卡:
{
"awsS3":{
"ResourceType":"aws.S3",
"Details":{
"Name":"crossplane-test",
"CreationDate":"2020-08-17T11:28:05+00:00"
},
"AccessBlock":{
"PublicAccessBlockConfiguration":{
"BlockPublicAcls":true,
"IgnorePublicAcls":true,
"BlockPublicPolicy":true,
"RestrictPublicBuckets":true
}
},
"Location":{
"LocationConstraint":"us-west-2"
}
}
}
主要class:
public class S3SidhiApp {
public static void main(String[] args) {
internalStreamSiddhiApp.start();
//kafkaStreamApp.start();
}
}
应用class:
package flinksidhi.app;
import com.google.gson.JsonObject;
import flinksidhi.event.s3.source.S3EventSource;
import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.json.JSONObject;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static flinksidhi.app.connector.Consumers.createInputMessageConsumer;
import static flinksidhi.app.connector.Producer.*;
public class internalStreamSiddhiApp {
private static final String inputTopic = "EVENT_STREAM_INPUT";
private static final String outputTopic = "EVENT_STREAM_OUTPUT";
private static final String consumerGroup = "EVENT_STREAM1";
private static final String kafkaAddress = "localhost:9092";
private static final String zkAddress = "localhost:2181";
private static final String S3_CQL1 = "from inputStream select * insert into temp";
private static final String S3_CQL = "from inputStream select json:toObject(awsS3) as obj insert into temp;" +
"from temp select json:getString(obj,'$.awsS3.ResourceType') as affected_resource_type," +
"json:getString(obj,'$.awsS3.Details.Name') as affected_resource_name," +
"json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration') as encryption," +
"json:getString(obj,'$.awsS3.Encryption.ServerSideEncryptionConfiguration.Rules[0].ApplyServerSideEncryptionByDefault.SSEAlgorithm') as algorithm insert into temp2; " +
"from temp2 select affected_resource_name,affected_resource_type, " +
"ifThenElse(encryption == ' ','Fail','Pass') as state," +
"ifThenElse(encryption != ' ' and algorithm == 'aws:kms','None','Critical') as severity insert into outputStream";
public static void start(){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataStream<String> inputS = env.addSource(new S3EventSource());
//Flink kafka stream consumer
FlinkKafkaConsumer<String> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress,zkAddress, consumerGroup);
//Add Data stream source -- flink consumer
DataStream<String> inputS = env.addSource(flinkKafkaConsumer);
SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
cep.registerExtension("json:toObject", io.siddhi.extension.execution.json.function.ToJSONObjectFunctionExtension.class);
cep.registerExtension( "json:getString", io.siddhi.extension.execution.json.function.GetStringJSONFunctionExtension.class);
cep.registerStream("inputStream", inputS, "awsS3");
inputS.print();
System.out.println(cep.getDataStreamSchemas());
//json needs extension jars to present during runtime.
DataStream<Map<String,Object>> output = cep
.from("inputStream")
.cql(S3_CQL1)
.returnAsMap("temp");
//Flink kafka stream Producer
FlinkKafkaProducer<Map<String, Object>> flinkKafkaProducer =
createMapProducer(env,outputTopic, kafkaAddress);
//Add Data stream sink -- flink producer
output.addSink(flinkKafkaProducer);
output.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者class:
package flinksidhi.app.connector;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.json.JSONObject;
import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer<String> createInputMessageConsumer(String topic, String kafkaAddress, String zookeeprAddr, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("zookeeper.connect", zookeeprAddr);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(
topic,new SimpleStringSchema(),properties);
return consumer;
}
}
制作人class:
package flinksidhi.app.connector;
import flinksidhi.app.util.ConvertJavaMapToJson;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.json.JSONObject;
import java.util.Map;
public class Producer {
public static FlinkKafkaProducer<Tuple2> createStringProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {
return new FlinkKafkaProducer<Tuple2>(kafkaAddress, topic, new AverageSerializer());
}
public static FlinkKafkaProducer<Map<String,Object>> createMapProducer(StreamExecutionEnvironment env, String topic, String kafkaAddress) {
return new FlinkKafkaProducer<Map<String,Object>>(kafkaAddress, topic, new SerializationSchema<Map<String, Object>>() {
@Override
public void open(InitializationContext context) throws Exception {
}
@Override
public byte[] serialize(Map<String, Object> stringObjectMap) {
String json = ConvertJavaMapToJson.convert(stringObjectMap);
return json.getBytes();
}
});
}
}
我已经尝试了很多东西,但从未调用过调用 CQL 的代码,甚至没有给出任何错误,不知道哪里出错了。
同样的事情,如果我创建一个内部流源并使用相同的输入 json 到 return 作为字符串。
初步猜测:如果你使用的是事件时间,你确定你定义的水印是正确的吗?如 docs 中所述:
(...) an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed (...)
如果这没有帮助,我会建议 decompose/simplify 将作业降至最低限度,例如,只有一个源运算符和一些简单的接收器 printing/logging 元素。如果可行,开始一个接一个地向后添加运算符。您也可以从尽可能简化 CEP 模式开始。
首先非常感谢@Piotr Nowojski,只是因为你的小提示,无论我对事件时间思考了多少次,我都没有想到。所以是的,在调试这两种情况时:
- 使用内部数据源,它在其中成功处理,在调试流程时,我发现它在处理数据后正在处理水印,但我没有发现它以某种方式管理事件时间数据隐含。
- 使用 kafka 作为数据源,在调试时我可以非常清楚地看到它没有处理流中的任何水印,但我没有想到,它是由于事件时间和水印而发生的处理不当。
只需在我从下面理解的应用程序代码中添加一行代码 Flink 代码片段:
@deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link
* TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling
* event-time support anymore. Explicitly using processing-time windows and timers works in
* event-time mode. If you need to disable watermarks, please use {@link
* ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
* TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link
* WatermarkStrategy}. If you are using generic "time window" operations (for example {@link
* org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
* that change behaviour based on the time characteristic, please use equivalent operations
* that explicitly specify processing time or event time.
*/
我知道默认情况下 flink 会考虑事件时间,因此需要正确处理水印,而我没有,所以我在下面添加了 link 用于设置时间特征flink执行环境:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
和kaboom ...它开始工作了,虽然这已被弃用并且需要一些其他配置,但非常感谢,它是一个很好的指导,对我帮助很大,我解决了这个问题..
再次感谢@Piotr Nowojski