无法通过级联一键连接两个文件
Couldn`t join two files with one key via Cascading
让我们看看我们有什么。第一个文件 [接口 Class]:
list arrayList
list linkedList
第二个文件[Class1个数量]:
arrayList 120
linkedList 4
我想通过键 [Class] 加入这两个文件并获取每个接口的计数:
list arraylist 120
list linkedlist 4
代码:
public class Main
{
public static void main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
String doc2Path = args[ 2 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
AppProps.setApplicationName( properties, "Part 1" );
AppProps.addApplicationTag( properties, "lets:do:it" );
AppProps.addApplicationTag( properties, "technology:Cascading" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// create source and sink taps
Tap wcTap = new Hfs(new TextDelimited(true, ","), wcPath);
Fields classInterfaceFiles = new Fields("interface", "class");
Tap classInterfaceTap = new Hfs(new TextDelimited(classInterfaceFiles, true, ","), docPath);
Fields classAmountFields = new Fields("class1", "amount");
Tap classAmountFileTap = new Hfs(new TextDelimited(classAmountFields, true, ","), doc2Path);
Tap outTap = new MultiSinkTap(); // just saying, create your own tap
Pipe classInterfaceFilePipe = new Pipe("classInterfaceFilePipe");
Pipe classIAmountFilePipe = new Pipe("classIAmountFilePipe");
Fields groupFields = new Fields("class");
Fields groupFields1 = new Fields("class1"); // fields used as joining keys
Pipe outPipe = new CoGroup(classInterfaceFilePipe, groupFields, classIAmountFilePipe, groupFields1, new InnerJoin());
// build flow definition
FlowDef flowDef = FlowDef.flowDef().setName("myFlow")
.addSource(classInterfaceFilePipe, classInterfaceTap)
.addSource(classIAmountFilePipe, classAmountFileTap)
.addTailSink(outPipe, wcTap);
// .addTailSink( outPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
[这是更大任务的第一步]
发生这种情况是因为您在连接在一起的两个管道中有相同的字段,即 "class"。也许您可以将它们重命名为 "class_interface" 和 "class_amount"。您还必须更改在 CoGroup 管道中使用的 groupFields。
让我们看看我们有什么。第一个文件 [接口 Class]:
list arrayList
list linkedList
第二个文件[Class1个数量]:
arrayList 120
linkedList 4
我想通过键 [Class] 加入这两个文件并获取每个接口的计数:
list arraylist 120
list linkedlist 4
代码:
public class Main
{
public static void main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
String doc2Path = args[ 2 ];
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
AppProps.setApplicationName( properties, "Part 1" );
AppProps.addApplicationTag( properties, "lets:do:it" );
AppProps.addApplicationTag( properties, "technology:Cascading" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// create source and sink taps
Tap wcTap = new Hfs(new TextDelimited(true, ","), wcPath);
Fields classInterfaceFiles = new Fields("interface", "class");
Tap classInterfaceTap = new Hfs(new TextDelimited(classInterfaceFiles, true, ","), docPath);
Fields classAmountFields = new Fields("class1", "amount");
Tap classAmountFileTap = new Hfs(new TextDelimited(classAmountFields, true, ","), doc2Path);
Tap outTap = new MultiSinkTap(); // just saying, create your own tap
Pipe classInterfaceFilePipe = new Pipe("classInterfaceFilePipe");
Pipe classIAmountFilePipe = new Pipe("classIAmountFilePipe");
Fields groupFields = new Fields("class");
Fields groupFields1 = new Fields("class1"); // fields used as joining keys
Pipe outPipe = new CoGroup(classInterfaceFilePipe, groupFields, classIAmountFilePipe, groupFields1, new InnerJoin());
// build flow definition
FlowDef flowDef = FlowDef.flowDef().setName("myFlow")
.addSource(classInterfaceFilePipe, classInterfaceTap)
.addSource(classIAmountFilePipe, classAmountFileTap)
.addTailSink(outPipe, wcTap);
// .addTailSink( outPipe, wcTap );
// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}
}
[这是更大任务的第一步]
发生这种情况是因为您在连接在一起的两个管道中有相同的字段,即 "class"。也许您可以将它们重命名为 "class_interface" 和 "class_amount"。您还必须更改在 CoGroup 管道中使用的 groupFields。