如何高效地将数据从flink管道写入redis
How to write data from flink pipeline to redis efficiently
我正在 Apache flink 中构建管道 sql api。
管道进行简单的投影查询。但是,我需要在查询之前和查询之后再写一次元组(恰好是每个元组中的一些元素)。
事实证明,我用来写入 redis 的代码严重降低了性能。即 flink 以非常小的数据速率产生背压。
我的代码有什么问题,我该如何改进。请有任何建议。
当我停止写入redis前后,性能都非常出色。
这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks =
purchasesStream
.flatMap(new PurchasesParser())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
return element.getField(3);
}
});
Table purchasesTable = tEnv.fromDataStream(purchaseWithTimestampsAndWatermarks, "userID, gemPackID,price, rowtime.rowtime");
tEnv.registerTable("purchasesTable", purchasesTable);
purchaseWithTimestampsAndWatermarks.flatMap(new WriteToRedis());
Table result = tEnv.sqlQuery("SELECT userID, gemPackID, rowtime from purchasesTable");
DataStream<Tuple2<Boolean, Row>> queryResultAsDataStream = tEnv.toRetractStream(result, Row.class);
queryResultAsDataStream.flatMap(new WriteToRedis());
try {
env.execute("flink SQL");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* write to redis
*/
public static class WriteToRedis extends RichFlatMapFunction<Tuple4<Integer, Integer, Integer, Long>, String> {
RedisReadAndWrite redisReadAndWrite;
@Override
public void open(Configuration parameters) {
LOG.info("Opening connection with Jedis to {}", "redis");
this.redisReadAndWrite = new RedisReadAndWrite("redis",6379);
}
@Override
public void flatMap(Tuple4<Integer, Integer, Integer, Long> input, Collector<String> out) throws Exception {
this.redisReadAndWrite.write(input.f0+":"+input.f3+"","time_seen", TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
}
}
}
public class RedisReadAndWrite {
private Jedis flush_jedis;
public RedisReadAndWrite(String redisServerName , int port) {
flush_jedis=new Jedis(redisServerName,port);
}
public void write(String key,String field, String value) {
flush_jedis.hset(key,field,value);
}
}
附加部分:
我尝试了第二个实现过程函数,该函数使用 Jedis 批量写入 toredis。但是我收到以下错误。 org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图将批处理消息的数量减少,但一段时间后仍然出现错误。
这里是进程函数的实现:
/**
* 使用进程函数写入redis
*/
public static class WriteToRedisAfterQueryProcessFn extends ProcessFunction<Tuple2<Boolean, Row>, String> {
Long timetoFlush;
@Override
public void open(Configuration parameters) {
flush_jedis=new Jedis("redis",6379,1800);
p = flush_jedis.pipelined();
this.timetoFlush=System.currentTimeMillis()-initialTime;
}
@Override
public void processElement(Tuple2<Boolean, Row> input, Context context, Collector<String> collector) throws Exception {
p.hset(input.f1.getField(0)+":"+new Instant(input.f1.getField(2)).getMillis()+"","time_updated",TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
throughputAccomulationcount++;
System.out.println(throughputAccomulationcount);
if(throughputAccomulationcount==50000){
throughputAccomulationcount=0L;
p.sync();
}
}
}
通常在写入外部服务时,这会成为 Flink 工作流的瓶颈。提高性能的最简单方法是通过 AsyncFunction. See this documentation 对工作流的该部分进行多线程处理以获取更多详细信息。
--肯
您遇到的性能不佳无疑是因为您为每次写入向 redis 发出同步请求。 @kkrugler 已经提到了 async i/o,这是针对这种情况的常见补救措施。那将需要切换到支持异步操作的 redis 客户端之一。
使用外部服务时常用的另一种解决方案是将写入组批处理在一起。对于绝地武士,您可以使用 pipelining。例如,您可以将 WriteToRedis
RichFlatMapFunction 替换为 ProcessFunction,该 ProcessFunction 以一定大小的批次对 Redis 进行流水线写入,并根据需要依赖超时来刷新其缓冲区。你可以使用 Flink 的 ListState 作为缓冲区。
我正在 Apache flink 中构建管道 sql api。 管道进行简单的投影查询。但是,我需要在查询之前和查询之后再写一次元组(恰好是每个元组中的一些元素)。 事实证明,我用来写入 redis 的代码严重降低了性能。即 flink 以非常小的数据速率产生背压。 我的代码有什么问题,我该如何改进。请有任何建议。
当我停止写入redis前后,性能都非常出色。 这是我的管道代码:
public class QueryExample {
public static Long throughputCounterAfter=new Long("0");
public static void main(String[] args) {
int k_partitions = 10;
reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(5 * 32);
Properties props = new Properties();
props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
// not to be shared with another job consuming the same topic
props.setProperty("group.id", "flink-group");
props.setProperty("enable.auto.commit","false");
FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
new SimpleStringSchema(),
props);
DataStream<String> purchasesStream = env
.addSource(purchasesConsumer)
.setParallelism(Math.min(5 * 32, k_partitions));
DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks =
purchasesStream
.flatMap(new PurchasesParser())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>>(Time.seconds(10)) {
@Override
public long extractTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
return element.getField(3);
}
});
Table purchasesTable = tEnv.fromDataStream(purchaseWithTimestampsAndWatermarks, "userID, gemPackID,price, rowtime.rowtime");
tEnv.registerTable("purchasesTable", purchasesTable);
purchaseWithTimestampsAndWatermarks.flatMap(new WriteToRedis());
Table result = tEnv.sqlQuery("SELECT userID, gemPackID, rowtime from purchasesTable");
DataStream<Tuple2<Boolean, Row>> queryResultAsDataStream = tEnv.toRetractStream(result, Row.class);
queryResultAsDataStream.flatMap(new WriteToRedis());
try {
env.execute("flink SQL");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* write to redis
*/
public static class WriteToRedis extends RichFlatMapFunction<Tuple4<Integer, Integer, Integer, Long>, String> {
RedisReadAndWrite redisReadAndWrite;
@Override
public void open(Configuration parameters) {
LOG.info("Opening connection with Jedis to {}", "redis");
this.redisReadAndWrite = new RedisReadAndWrite("redis",6379);
}
@Override
public void flatMap(Tuple4<Integer, Integer, Integer, Long> input, Collector<String> out) throws Exception {
this.redisReadAndWrite.write(input.f0+":"+input.f3+"","time_seen", TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
}
}
}
public class RedisReadAndWrite {
private Jedis flush_jedis;
public RedisReadAndWrite(String redisServerName , int port) {
flush_jedis=new Jedis(redisServerName,port);
}
public void write(String key,String field, String value) {
flush_jedis.hset(key,field,value);
}
}
附加部分: 我尝试了第二个实现过程函数,该函数使用 Jedis 批量写入 toredis。但是我收到以下错误。 org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图将批处理消息的数量减少,但一段时间后仍然出现错误。
这里是进程函数的实现:
/** * 使用进程函数写入redis */
public static class WriteToRedisAfterQueryProcessFn extends ProcessFunction<Tuple2<Boolean, Row>, String> {
Long timetoFlush;
@Override
public void open(Configuration parameters) {
flush_jedis=new Jedis("redis",6379,1800);
p = flush_jedis.pipelined();
this.timetoFlush=System.currentTimeMillis()-initialTime;
}
@Override
public void processElement(Tuple2<Boolean, Row> input, Context context, Collector<String> collector) throws Exception {
p.hset(input.f1.getField(0)+":"+new Instant(input.f1.getField(2)).getMillis()+"","time_updated",TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
throughputAccomulationcount++;
System.out.println(throughputAccomulationcount);
if(throughputAccomulationcount==50000){
throughputAccomulationcount=0L;
p.sync();
}
}
}
通常在写入外部服务时,这会成为 Flink 工作流的瓶颈。提高性能的最简单方法是通过 AsyncFunction. See this documentation 对工作流的该部分进行多线程处理以获取更多详细信息。
--肯
您遇到的性能不佳无疑是因为您为每次写入向 redis 发出同步请求。 @kkrugler 已经提到了 async i/o,这是针对这种情况的常见补救措施。那将需要切换到支持异步操作的 redis 客户端之一。
使用外部服务时常用的另一种解决方案是将写入组批处理在一起。对于绝地武士,您可以使用 pipelining。例如,您可以将 WriteToRedis
RichFlatMapFunction 替换为 ProcessFunction,该 ProcessFunction 以一定大小的批次对 Redis 进行流水线写入,并根据需要依赖超时来刷新其缓冲区。你可以使用 Flink 的 ListState 作为缓冲区。