运行 一个简单的本地模式级联程序
Run a simple Cascading program in local mode
我正在努力将这个简单的级联程序添加到 运行。由于某种原因,它什么都不做。至少我希望它能打印记录。任何帮助将不胜感激。
package com.myLearning.cascading;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Debug;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.local.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class operations_example
{
public static void main(String[] args)
{
Scheme sourceScheme = new TextDelimited(new Fields("username", "age"), true, ",");
String sourcePath = "C:/Users/Desktop/cascading/data/names.txt";
Tap sourceTap = new FileTap(sourceScheme, sourcePath);
Scheme targetScheme = new TextDelimited(new Fields("username", "age"), true, ",");
String targetPath = "C:/Users/Desktop/cascading/data/output2.txt";
Tap targetTap = new FileTap(targetScheme, targetPath, SinkMode.REPLACE);
Pipe dataPipe = new Pipe("data");
dataPipe = new Each(dataPipe, new Debug());
ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE);
dataPipe = new Each( dataPipe,new Fields("username","age"), filter);
FlowDef flowdef = FlowDef.flowDef().
addSource(dataPipe, sourceTap).
addTailSink(dataPipe, targetTap);
Flow flow = new LocalFlowConnector().connect(flowdef);
flow.stop();
}
}
你没有执行流程。
流创建后,调用 complete()
(阻塞)或 start()
来执行它。调用stop()
是不去执行流程。
http://docs.cascading.org/impatient/impatient1.html
http://docs.cascading.org/cascading/1.2/javadoc/cascading/flow/Flow.html#complete()
我正在努力将这个简单的级联程序添加到 运行。由于某种原因,它什么都不做。至少我希望它能打印记录。任何帮助将不胜感激。
package com.myLearning.cascading;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Debug;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.local.TextDelimited;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class operations_example
{
public static void main(String[] args)
{
Scheme sourceScheme = new TextDelimited(new Fields("username", "age"), true, ",");
String sourcePath = "C:/Users/Desktop/cascading/data/names.txt";
Tap sourceTap = new FileTap(sourceScheme, sourcePath);
Scheme targetScheme = new TextDelimited(new Fields("username", "age"), true, ",");
String targetPath = "C:/Users/Desktop/cascading/data/output2.txt";
Tap targetTap = new FileTap(targetScheme, targetPath, SinkMode.REPLACE);
Pipe dataPipe = new Pipe("data");
dataPipe = new Each(dataPipe, new Debug());
ExpressionFilter filter = new ExpressionFilter("age >= 30", Integer.TYPE);
dataPipe = new Each( dataPipe,new Fields("username","age"), filter);
FlowDef flowdef = FlowDef.flowDef().
addSource(dataPipe, sourceTap).
addTailSink(dataPipe, targetTap);
Flow flow = new LocalFlowConnector().connect(flowdef);
flow.stop();
}
}
你没有执行流程。
流创建后,调用 complete()
(阻塞)或 start()
来执行它。调用stop()
是不去执行流程。
http://docs.cascading.org/impatient/impatient1.html http://docs.cascading.org/cascading/1.2/javadoc/cascading/flow/Flow.html#complete()