使用 MultiSinkTap 生成多个输出文件
Generate multiple output files using MultiSinkTap
我有以下数据集作为输入
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
day1,Daisy,Female
jazz030,Jasmine,Female
Mic002,Michael,Male
我的目标是将男性和女性分成两个单独的输出文件,如下所示
男性数据集
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
Mic002,Michael,Male
女性数据集
id,name,gender
day1,Daisy,Female
jazz030,Jasmine,Female
现在,我尝试写了一个Cascading Framework代码来完成上面的任务,代码如下
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
// ...split into two pipes
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
// create the flow
List<Pipe> pipes = new ArrayList<Pipe>(2)
{{pipes.add(countOne);
pipes.add(countTwo);}};
Tap outputTap=new MultiSinkTap<>(sink_one,sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, outputTap, pipes);
flow.complete();
}
where CustomFilterByGender(字符串性别);是一个自定义函数,根据作为参数传递的性别值 returns 元组。
请注意,为了提高效率,我没有使用Custom Buffer。
使用 MultiSinkTap,我无法获得所需的输出,因为 LocalFlowConnector
对象的 connect()
方法不接受导致编译时错误的 MultiSinkTap 对象。
如果您建议对上述代码进行可能的更改以使其工作或使用 MultiSinkTap
.
的方式,那将是势在必行的。
感谢您耐心解答问题:)
我想你想将不同管道的输出写入不同的输出文件,我对你的代码做了一些修改,应该可以肯定地解决你的问题。
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
List<Pipe> pipes = new ArrayList<Pipe>(2);
pipes.add(malePipe);
pipes.add(femalePipe);
Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put("for_male", sink_one);
sinks.put("for_female", sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, sinks, pipes);
flow.complete();
}
而不是使用 MultiSinkTap 你可以直接给 Sinks 的 Map<> 你想要连接到输出管道的那些case malePipe 和 femalePipe.
我有以下数据集作为输入
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
day1,Daisy,Female
jazz030,Jasmine,Female
Mic002,Michael,Male
我的目标是将男性和女性分成两个单独的输出文件,如下所示
男性数据集
id,name,gender
asinha161,Aniruddha,Male
vic,Victor,Male
Mic002,Michael,Male
女性数据集
id,name,gender
day1,Daisy,Female
jazz030,Jasmine,Female
现在,我尝试写了一个Cascading Framework代码来完成上面的任务,代码如下
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
// ...split into two pipes
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
// create the flow
List<Pipe> pipes = new ArrayList<Pipe>(2)
{{pipes.add(countOne);
pipes.add(countTwo);}};
Tap outputTap=new MultiSinkTap<>(sink_one,sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, outputTap, pipes);
flow.complete();
}
where CustomFilterByGender(字符串性别);是一个自定义函数,根据作为参数传递的性别值 returns 元组。
请注意,为了提高效率,我没有使用Custom Buffer。
使用 MultiSinkTap,我无法获得所需的输出,因为 LocalFlowConnector
对象的 connect()
方法不接受导致编译时错误的 MultiSinkTap 对象。
如果您建议对上述代码进行可能的更改以使其工作或使用 MultiSinkTap
.
的方式,那将是势在必行的。
感谢您耐心解答问题:)
我想你想将不同管道的输出写入不同的输出文件,我对你的代码做了一些修改,应该可以肯定地解决你的问题。
public class Main {
public static void main(String[] args) {
Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt");
Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt");
Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt");
Pipe assembly = new Pipe("inputPipe");
Pipe malePipe = new Pipe("for_male", assembly);
malePipe=new Each(malePipe,new CustomFilterByGender("male"));
Pipe femalePipe = new Pipe("for_female", assembly);
femalePipe=new Each(femalePipe, new CustomFilterByGender("female"));
List<Pipe> pipes = new ArrayList<Pipe>(2);
pipes.add(malePipe);
pipes.add(femalePipe);
Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put("for_male", sink_one);
sinks.put("for_female", sink_two);
FlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect(sourceTap, sinks, pipes);
flow.complete();
}
而不是使用 MultiSinkTap 你可以直接给 Sinks 的 Map<> 你想要连接到输出管道的那些case malePipe 和 femalePipe.