创建新字段

Create new field

如何为我的结果创建一个新字段?

我正在使用我想要附加时间戳的字数统计作业。目前它包含字段 'word' 和 'count'.

我的目标是创建一个如下所示的元组: 'word' 'count' 'timestamp'

到目前为止,这是我的代码。我尝试在名为 'TimestampAppender'

的自定义函数中附加时间戳
wcPipe = new Each(wcPipe, Fields.ALL, new TimestampAppender(Fields.ALL), Fields.RESULTS);

TimestampAppender:

public class TimestampAppender extends BaseOperation implements Function {
  public TimestampAppender(Fields fieldDeclaration) {
    super(2, fieldDeclaration);
}

public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
    TupleEntry argument = functionCall.getArguments();

    String arg0 = argument.getString(0);
    String arg1 = argument.getString(1);

    Tuple result = new Tuple();
    result.addString(arg0);
    result.addString(arg1);
    result.addString("01-01-2015");

    functionCall.getOutputCollector().add(result);
}

您可以将代码更改为:

wcPipe = new Each(wcPipe, new Insert(new Fields("timestamp"), ""), Fields.ALL);
wcPipe = new Each(wcPipe, Fields.ALL, new TimestampAppender(Fields.ALL), Fields.RESULTS);

TimestampAppender:

public class TimestampAppender extends BaseOperation implements Function {
  public TimestampAppender(Fields fieldDeclaration) {
    super(Fields.ARGS);
}

public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
    TupleEntry argument = functionCall.getArguments();

    String arg0 = argument.getString(0);
    String arg1 = argument.getString(1);

    Tuple result = new Tuple();
    result.addString(arg0);
    result.addString(arg1);
    result.addString("01-01-2015");

    functionCall.getOutputCollector().add(result);
}