Java Apache Beam PCollections 以及如何使它们工作?
Java Apache Beam PCollections and how to make them work?
首先让我描述一下场景。
第 1 步。我必须逐行读取文件。该文件是一个 .json 并且 每行 具有以下格式:
{
"schema":{Several keys that are to be deleted},
"payload":{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"}
}
第 2 步。删除架构对象并结束(为后续步骤添加更多示例):
{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"}
{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US","key5":"90"}
{"key1":2002,"key2":"cccc","key3":"hhhh","key4":"CN","key5":"80"}
第 3 步。将这些值拆分为键和值,使它们在内存中 json,并使用 map
将字符串用作键和值
{"key1":20001,"key2":"aaaa","key3":"bbbb"} = {"key4":"USD","key5":"100"}
{"key1":20001,"key2":"aaaa","key3":"bbbb"} = {"key4":"US","key5":"90"}
{"key1":2002,"key2":"cccc","key3":"hhhh"} = {"key4":"CN","key5":"80"}
第 4 步,以及由于缺乏 Pcollections 知识而无法完成的步骤。我需要抓取所有读取的行并执行 GroupByKey,这样它最终会像:
{"key1":20001,"key2":"aaaa","key3":"bbbb"} = [
{"key4":"USD","key5":"100"},
{"key4":"US","key5":"90"} ]
{"key1":2002,"key2":"cccc","key3":"hhhh"} = {"key4":"CN","key5":"80"}
现在我的代码是这样的:
static void runSimplePipeline(PipelineOptionsCustom options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply("TransformData", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
ObjectMapper oMapper = new ObjectMapper();
JSONObject obj_key = new JSONObject();
JSONObject obj_value = new JSONObject();
List<String> listMainKeys = Arrays.asList(new String[]{"Key1", "Key2", "Key3"});
HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
parsedMap.remove("schema");
Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
for (Map.Entry<String,String> entry : map.entrySet()) {
if (listMainKeys.contains(entry.getKey())) {
obj_key.put(entry.getKey(),entry.getValue());
} else {
obj_value.put(entry.getKey(),entry.getValue());
}
}
KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());
System.out.print(obj_key.toString() + " : " + obj_value.toString() +"\n");
}
})); <------- RIGHT HERE
p.run().waitUntilFinish();
}
现在很明显的部分是它说 "RIGHT HERE" 我应该用 CountByKey 申请另一个,但是这需要一个完整的 PCollection 而这正是我真正不明白的。
这是代码,感谢 Guillem Xercavins 的链接 Github:
static void runSimplePipeline(PipelineOptionsCustom options) {
Pipeline p = Pipeline.create(options);
PCollection<Void> results = p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply("TransformData", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
ObjectMapper oMapper = new ObjectMapper();
JSONObject obj_key = new JSONObject();
JSONObject obj_value = new JSONObject();
List<String> listMainKeys = Arrays
.asList(new String[] { "EBELN", "AEDAT", "BATXT", "EKOTX", "Land1", "WAERS" });
HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
parsedMap.remove("schema");
Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
for (Map.Entry<String, String> entry : map.entrySet()) {
if (listMainKeys.contains(entry.getKey())) {
obj_key.put(entry.getKey(), entry.getValue());
} else {
obj_value.put(entry.getKey(), entry.getValue());
}
}
KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());
c.output(objectKV);
}
})).apply("Group By Key", GroupByKey.<String, String>create())
.apply("Continue Processing", ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.print(c.element());
}
}));
p.run().waitUntilFinish();
}
首先让我描述一下场景。
第 1 步。我必须逐行读取文件。该文件是一个 .json 并且 每行 具有以下格式:
{
"schema":{Several keys that are to be deleted},
"payload":{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"}
}
第 2 步。删除架构对象并结束(为后续步骤添加更多示例):
{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"}
{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US","key5":"90"}
{"key1":2002,"key2":"cccc","key3":"hhhh","key4":"CN","key5":"80"}
第 3 步。将这些值拆分为键和值,使它们在内存中 json,并使用 map
将字符串用作键和值{"key1":20001,"key2":"aaaa","key3":"bbbb"} = {"key4":"USD","key5":"100"}
{"key1":20001,"key2":"aaaa","key3":"bbbb"} = {"key4":"US","key5":"90"}
{"key1":2002,"key2":"cccc","key3":"hhhh"} = {"key4":"CN","key5":"80"}
第 4 步,以及由于缺乏 Pcollections 知识而无法完成的步骤。我需要抓取所有读取的行并执行 GroupByKey,这样它最终会像:
{"key1":20001,"key2":"aaaa","key3":"bbbb"} = [
{"key4":"USD","key5":"100"},
{"key4":"US","key5":"90"} ]
{"key1":2002,"key2":"cccc","key3":"hhhh"} = {"key4":"CN","key5":"80"}
现在我的代码是这样的:
static void runSimplePipeline(PipelineOptionsCustom options) {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply("TransformData", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
ObjectMapper oMapper = new ObjectMapper();
JSONObject obj_key = new JSONObject();
JSONObject obj_value = new JSONObject();
List<String> listMainKeys = Arrays.asList(new String[]{"Key1", "Key2", "Key3"});
HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
parsedMap.remove("schema");
Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
for (Map.Entry<String,String> entry : map.entrySet()) {
if (listMainKeys.contains(entry.getKey())) {
obj_key.put(entry.getKey(),entry.getValue());
} else {
obj_value.put(entry.getKey(),entry.getValue());
}
}
KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());
System.out.print(obj_key.toString() + " : " + obj_value.toString() +"\n");
}
})); <------- RIGHT HERE
p.run().waitUntilFinish();
}
现在很明显的部分是它说 "RIGHT HERE" 我应该用 CountByKey 申请另一个,但是这需要一个完整的 PCollection 而这正是我真正不明白的。
这是代码,感谢 Guillem Xercavins 的链接 Github:
static void runSimplePipeline(PipelineOptionsCustom options) {
Pipeline p = Pipeline.create(options);
PCollection<Void> results = p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply("TransformData", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
ObjectMapper oMapper = new ObjectMapper();
JSONObject obj_key = new JSONObject();
JSONObject obj_value = new JSONObject();
List<String> listMainKeys = Arrays
.asList(new String[] { "EBELN", "AEDAT", "BATXT", "EKOTX", "Land1", "WAERS" });
HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
parsedMap.remove("schema");
Map<String, String> map = oMapper.convertValue(parsedMap.get("payload"), Map.class);
for (Map.Entry<String, String> entry : map.entrySet()) {
if (listMainKeys.contains(entry.getKey())) {
obj_key.put(entry.getKey(), entry.getValue());
} else {
obj_value.put(entry.getKey(), entry.getValue());
}
}
KV objectKV = KV.of(obj_key.toJSONString(), obj_value.toJSONString());
c.output(objectKV);
}
})).apply("Group By Key", GroupByKey.<String, String>create())
.apply("Continue Processing", ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.print(c.element());
}
}));
p.run().waitUntilFinish();
}