级联 - 根据字段值连接两个文件

Cascading - Join two files based on field value

我正在尝试根据某些公共字段值连接两个文件并获取所有匹配的记录。

我有两个 Tap 用于读取两个文件。我想加入文件并使用 no Field.

获取匹配的记录

如何加入文件和 assemble 管道以创建 Flow

示例代码:

Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, Test.class);
FlowConnector flowConnector = new LocalFlowConnector();

Fields custFields = new Fields("no", "name", "city");
FileTap custFileTap = new FileTap(new TextDelimited(custFields,true, ","), "C://Users//Test//cust.txt");

Fields tsctnFields = new Fields("no", "tdate", "tamt");
FileTap tsctnFileTap = new FileTap(new TextDelimited(tsctnFields,true, ","),    "C://Users//Test//tsctn.txt");

构建连接到水龙头的管道,连接它们,然后将输出管道连接到水槽。

Tap outTap = new MultiSinkTap(); // just saying, create your own tap
Pipe custFilePipe = new Pipe("custFilePipe");
Pipe tsctnFilePipe = new Pipe("tsctnFilePipe");

Fields groupFields = new Fields("no"); // fields used as joining keys
Pipe outPipe = new CoGroup(custFilePipe, groupFields, tsctnFilePipe, groupFields, new InnerJoin());

// build flow definition
FlowDef flowDef = FlowDef.flowDef().setName("myFlow")
 .addSource(custFilePipe, custFileTap)
 .addSource(tsctnFilePipe, tsctnFileTap)
 .addTailSink(outPipe, outTap);

Flow flow = flowConnector.connect(flowDef); // now you build the flow
flow.complete(); // run flow

Cascading for the Impatient 是一个很好的教程,我会推荐给 Cascading 初学者。