Apache Beam 如何根据日期值过滤数据

Apache Beam how to filter data based on date value

我正在尝试从 CSV 文件中读取记录并根据日期过滤记录。我已经通过以下方式实现了这一点。但这是正确的方法吗?

步骤是:

  1. 正在创建管道
  2. 从文件中读取数据
  3. 执行必要的过滤
  4. 创建一个 MapElement 对象并将 OrderRequest 转换为 String
  5. 将 OrderRequest 实体映射到字符串
  6. 将输出写入文件

代码:

// Creating pipeline
Pipeline pipeline = Pipeline.create();

// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
        .apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));

PCollection<OrderRequest> pCollectionTransformation = orderRequest
        .apply(ParDo.of(new DoFn<String, OrderRequest>() {

            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) {
                String rowString = c.element();
                if (!rowString.contains("order_id")) {
                    String[] strArr = rowString.split(",");
                    OrderRequest orderRequest = new OrderRequest();
                    orderRequest.setOrder_id(strArr[0]);
                    // Condition to check if the

                    String source1 = strArr[1];
                    DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
                    DateTime d1 = fmt1.parseDateTime(source1);
                    System.out.println(d1);

                    String source2 = "4/24/2017";
                    DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
                    DateTime d2 = fmt2.parseDateTime(source2);
                    System.out.println(d2);

                    orderRequest.setOrder_date(strArr[1]);
                    System.out.println(strArr[1]);

                    orderRequest.setAmount(Double.valueOf(strArr[2]));
                    orderRequest.setCounter_id(strArr[3]);
                    if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
                        c.output(orderRequest);
                    }
                }
            }
        }));

// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
        .via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id() + " "
                + orderRequestType.getOrder_date() + " " + orderRequestType.getAmount() + " "
                + orderRequestType.getCounter_id());

// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);

// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
        .withSuffix(".csv"));

// To run pipeline
pipeline.run();

System.out.println("We are done!!");

波乔 Class:

public class OrderRequest  implements Serializable{
    String order_id;
    String order_date;
    double amount;
    String counter_id;
}

虽然我得到了正确的结果,但这是正确的方法吗?我的两个主要问题是

1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?

是的,您可以使用 TextIO.read() 像这样处理 CSV 文件,前提是它们不包含嵌入换行符的字段,并且您可以 recognize/skip header 行。您的管道看起来不错,但作为一个小的样式问题,我可能会让第一个 ParDo 只进行解析,然后是一个查看日期以过滤掉内容的过滤器。

如果您想自动推断 header 行,您可以在主程序中打开阅读第一行(使用标准 java 库,或 Beams FileSystems class) 并手动将其提取出来,将其传递给您的解析 DoFn。

我同意更柱状的方法会更自然。我们在 Python 中有这个作为我们的 Dataframes API which is now available for general use。你会写这样的东西

with beam.Pipeline() as p:
   df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
   filtered = df[df.order_date > limit]
   filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")