使用 jdbc 连接器将 flink DataStream 接收到具有覆盖的 mysql 接收
Sink flink DataStream using jdbc connector to mysql sink with overwrite
我的用例是
- 从 AWS Kinesis 数据流获取数据并filter/map 使用 flink 数据流 api
- 使用 StreamTable Environment 对数据进行分组聚合
- 使用 SQLTableEnvironment 使用 JDBC Connector
写入 mysql
我能够将我的数据流结果写入 mySQL table 但是由于流式传输它附加了每个新行,而我想覆盖。
consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-central-1");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// Parse Message
DataStream<Event> events = env.addSource(
new FlinkKinesisConsumer<>(
Config.INPUT_STREAM,
new KinesisEventDeserializationSchema(),
consumerConfig
)
)
.uid("kinesisEventSource");
....
....
....
SingleOutputStreamOperator<ArticleView> filteredDetailsViewEvents = articleViews
.filter(new FilterFunction<ArticleView>() {
@Override
public boolean filter(ArticleView event) throws Exception {
return StringUtils.isNotBlank(event.getArticleNumber());
}
})
.uid("filteredDetailsViewFilter");
Table t=tEnv.fromDataStream(filteredDetailsViewEvents);
tEnv.executeSql("CREATE TABLE eventsSlider1 (\n" +
" articleNumber String,\n" +
" mandant String,\n" +
" category STRING,\n" +
" cnt BIGINT NOT NULL,\n" +
" CONSTRAINT pk_event PRIMARY KEY (articleNumber,mandant,category) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/events',\n" +
" 'connector.table' = 'categorySliderItems',\n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = '123456'\n"
")");
tEnv.executeSql("INSERT INTO eventsSlider1 (SELECT articleNumber,mandant,category,cnt "+
"FROM ("+
" SELECT articleNumber,mandant,category,count(articleNumber) as cnt,"+
" ROW_NUMBER() OVER (PARTITION BY mandant,category ORDER BY count(articleNumber) DESC) as row_num"+
" FROM "+t+" group by articleNumber,category, mandant)"+
" WHERE row_num <= 3)");
问题是我没有在 table 中设置正确的主键。因为主键是 flink 唯一可以检查更新插入操作并选择更新或插入操作的东西。
我的用例是
- 从 AWS Kinesis 数据流获取数据并filter/map 使用 flink 数据流 api
- 使用 StreamTable Environment 对数据进行分组聚合
- 使用 SQLTableEnvironment 使用 JDBC Connector 写入 mysql
我能够将我的数据流结果写入 mySQL table 但是由于流式传输它附加了每个新行,而我想覆盖。
consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-central-1");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// Parse Message
DataStream<Event> events = env.addSource(
new FlinkKinesisConsumer<>(
Config.INPUT_STREAM,
new KinesisEventDeserializationSchema(),
consumerConfig
)
)
.uid("kinesisEventSource");
....
....
....
SingleOutputStreamOperator<ArticleView> filteredDetailsViewEvents = articleViews
.filter(new FilterFunction<ArticleView>() {
@Override
public boolean filter(ArticleView event) throws Exception {
return StringUtils.isNotBlank(event.getArticleNumber());
}
})
.uid("filteredDetailsViewFilter");
Table t=tEnv.fromDataStream(filteredDetailsViewEvents);
tEnv.executeSql("CREATE TABLE eventsSlider1 (\n" +
" articleNumber String,\n" +
" mandant String,\n" +
" category STRING,\n" +
" cnt BIGINT NOT NULL,\n" +
" CONSTRAINT pk_event PRIMARY KEY (articleNumber,mandant,category) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/events',\n" +
" 'connector.table' = 'categorySliderItems',\n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = '123456'\n"
")");
tEnv.executeSql("INSERT INTO eventsSlider1 (SELECT articleNumber,mandant,category,cnt "+
"FROM ("+
" SELECT articleNumber,mandant,category,count(articleNumber) as cnt,"+
" ROW_NUMBER() OVER (PARTITION BY mandant,category ORDER BY count(articleNumber) DESC) as row_num"+
" FROM "+t+" group by articleNumber,category, mandant)"+
" WHERE row_num <= 3)");
问题是我没有在 table 中设置正确的主键。因为主键是 flink 唯一可以检查更新插入操作并选择更新或插入操作的东西。