将 Epoch 转换为日期和时间 - HADOOP

Convert Epoch to date and time - HADOOP

我正在尝试学习 hadoop (mapreduce)。我有一个映射器方法,我在其中使用 Date class 来解析 ;epoch_time;数据集中以毫秒表示的字段。数据集由 25.05.2015 到 10.08.2015 之间的时期组成。

我想将纪元转换为 date/time 但只有 return 从 2015 年 6 月 5 日到 2015 年 6 月 15 日之间的纪元 date/time。

这是我到目前为止所取得的成就。下面的代码产生以下内容:

输出:

25.05.2015

25.06.2015

等等

期望的输出

05.06.2015 5//这个日期单词出现的次数

06.06.2015 53

07.06.2015 41

等等

映射器

   public class mapper extends Mapper<Object, Text, Text, IntWritable> { 
    private Text data = new Text();
     private IntWritable one = new IntWritable(1);
   String time;

      public void map(Object key, Text value, Context context) throws IOException,      InterruptedException {

String[] userinput = value.toString().split(";");
try{    


        LocalDateTime epoch = LocalDateTime.ofEpochSecond(Long.parseLong(userinput[0])/1000, 0, ZoneOffset.UTC);
        DateTimeFormatter f = DateTimeFormatter.ofPattern("dd.MM.yyyy");
        time = epoch.format(f);




    data.set(time);
    context.write(data,one);
}
catch(Exception e){
    System.out.println("Error: " + e);
}

    }
}

减速机

     public class reducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable one = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context)

    throws IOException, InterruptedException {

    int sum = 0;

    for (IntWritable value : values) {

        sum+=value.get();

    }

    one.set(sum);
    context.write(key, one);

}

}

所以你只关心这个括号内的数据...25.05.2015 [05.06.2015 ... 15.06.2015] 10.08.2015

如果这就是您所需要的,它就像 if 语句一样简单。

我对 Java 8 不太熟悉,但请检查一下 Java: how do I check if a Date is within a certain range?

public class mapper extends Mapper<Object, Text, Text, IntWritable> { 
   private Text data = new Text();
   private static final IntWritable ONE = new IntWritable(1);
   private static final DateTimeFormatter FMT = DateTimeFormatter.ofPattern("dd.MM.yyyy");
   String time;

   // Define the boundaries
   private LocalDateTime start = LocalDateTime.parse("2015.06.05", FMT);
   private LocalDateTime end = LocalDateTime.parse("2015.06.15", FMT);

   @Override
   public void map(Object key, Text value, Context context) throws IOException,      InterruptedException {

       String[] userinput = value.toString().split(";");
       try {
           Long ms = Long.parseLong(userinput[0])/1000;    
           LocalDateTime inputEpoch = LocalDateTime.ofEpochSecond(ms, 0, ZoneOffset.UTC);

           // Filter your data
           if (inputEpoch.isAfter(start) && inputEpoch.isBefore(end)) {
               data.set(inputEpoch.format(FMT));
               context.write(data,ONE);
           }
       } catch (...) { }
   }
}