如何根据 json 中的特定键将一个数据流的接收器添加到不同的路径?
How to add sink from one data stream to different paths depending on the specific key in json?
我有json喜欢,
{
"name":"someone",
"job":"doctor",
"etc":"etc"
}
在每个 json 中,"job" 都有不同的值,例如医生、飞行员、driver、看守等。
我想根据 "job" 值将每个 json 分开,并将其存储在不同的位置,如 /home/doctor
、/home/pilot
、/home/driver
等
我已经尝试使用 SplitStream 函数来执行此操作,但我必须指定这些值以匹配条件。
public class MyFlinkJob {
private static JsonParser jsonParser = new JsonParser();
private static String key_1 = "doctor";
private static String key_2 = "driver";
private static String key_3 = "pilot";
private static String key_default = "default";
public static void main(String args[]) throws Exception {
Properties prop = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafka);
props.setProperty("group.id", "myjob");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
DataStream<String> record = env.addSource(myConsumer).rebalance()
SplitStream<String> split = record.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String val) {
JsonObject json = (JsonObject)jsonParser.parse(val);
String jsonValue = CommonFields.getFieldValue(json, "job");
List<String> output = new ArrayList<String>();
if (key_1.equalsIgnoreCase(jsonValue)) {
output.add("doctor");
} else if (key_2.equalsIgnoreCase(jsonValue)) {
output.add("driver");
} else if (key_3.equalsIgnoreCase(jsonValue)) {
output.add("pilot");
} else {
output.add("default");
}
return output;
}});
DataStream<String> doctor = split.select("doctor");
DataStream<String> driver = split.select("driver");
DataStream<String> pilot = split.select("pilot");
DataStream<String> default1 = split.select("default");
doctor.addSink(getBucketingSink(batchSize, prop, key_1));
driver.addSink(getBucketingSink(batchSize, prop, key_2));
pilot.addSink(getBucketingSink(batchSize, prop, key_3));
default1.addSink(getBucketingSink(batchSize, prop, key_default));
env.execute("myjob");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static BucketingSink<String> getBucketingSink(Long BatchSize, Properties prop, String key) {
BucketingSink<String> sink = new BucketingSink<String>("hdfs://*/home/"+key);
Configuration conf = new Configuration();
conf.set("hadoop.job.ugi", "hdfs");
sink.setFSConfig(conf);
sink.setBucketer(new DateTimeBucketer<String>(prop.getProperty("DateTimeBucketer")));
return sink;
}
}
假设如果任何其他值出现在 "job" 中,比如工程师或其他东西,而我没有在 class 中指定,那么它会转到默认文件夹有没有办法拆分那些 json 事件自动基于 "job" 的值而不指定它,并创建一个包含值名称的路径,如 /home/enginerr?
您想使用 BucketingSink,它支持根据字段值将记录写入单独的存储桶。我可能有一个映射函数,它接受 JSON 字符串,解析它,并发出一个 Tuple2<String, String>
,其中第一个元素是 job
字段的值 JSON,第二个元素是完整的 JSON 字符串。
我有json喜欢,
{
"name":"someone",
"job":"doctor",
"etc":"etc"
}
在每个 json 中,"job" 都有不同的值,例如医生、飞行员、driver、看守等。
我想根据 "job" 值将每个 json 分开,并将其存储在不同的位置,如 /home/doctor
、/home/pilot
、/home/driver
等
我已经尝试使用 SplitStream 函数来执行此操作,但我必须指定这些值以匹配条件。
public class MyFlinkJob {
private static JsonParser jsonParser = new JsonParser();
private static String key_1 = "doctor";
private static String key_2 = "driver";
private static String key_3 = "pilot";
private static String key_default = "default";
public static void main(String args[]) throws Exception {
Properties prop = new Properties();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafka);
props.setProperty("group.id", "myjob");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), props);
DataStream<String> record = env.addSource(myConsumer).rebalance()
SplitStream<String> split = record.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String val) {
JsonObject json = (JsonObject)jsonParser.parse(val);
String jsonValue = CommonFields.getFieldValue(json, "job");
List<String> output = new ArrayList<String>();
if (key_1.equalsIgnoreCase(jsonValue)) {
output.add("doctor");
} else if (key_2.equalsIgnoreCase(jsonValue)) {
output.add("driver");
} else if (key_3.equalsIgnoreCase(jsonValue)) {
output.add("pilot");
} else {
output.add("default");
}
return output;
}});
DataStream<String> doctor = split.select("doctor");
DataStream<String> driver = split.select("driver");
DataStream<String> pilot = split.select("pilot");
DataStream<String> default1 = split.select("default");
doctor.addSink(getBucketingSink(batchSize, prop, key_1));
driver.addSink(getBucketingSink(batchSize, prop, key_2));
pilot.addSink(getBucketingSink(batchSize, prop, key_3));
default1.addSink(getBucketingSink(batchSize, prop, key_default));
env.execute("myjob");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static BucketingSink<String> getBucketingSink(Long BatchSize, Properties prop, String key) {
BucketingSink<String> sink = new BucketingSink<String>("hdfs://*/home/"+key);
Configuration conf = new Configuration();
conf.set("hadoop.job.ugi", "hdfs");
sink.setFSConfig(conf);
sink.setBucketer(new DateTimeBucketer<String>(prop.getProperty("DateTimeBucketer")));
return sink;
}
}
假设如果任何其他值出现在 "job" 中,比如工程师或其他东西,而我没有在 class 中指定,那么它会转到默认文件夹有没有办法拆分那些 json 事件自动基于 "job" 的值而不指定它,并创建一个包含值名称的路径,如 /home/enginerr?
您想使用 BucketingSink,它支持根据字段值将记录写入单独的存储桶。我可能有一个映射函数,它接受 JSON 字符串,解析它,并发出一个 Tuple2<String, String>
,其中第一个元素是 job
字段的值 JSON,第二个元素是完整的 JSON 字符串。