Apache Beam 如何根据日期值过滤数据
Apache Beam how to filter data based on date value
我正在尝试从 CSV 文件中读取记录并根据日期过滤记录。我已经通过以下方式实现了这一点。但这是正确的方法吗?
步骤是:
- 正在创建管道
- 从文件中读取数据
- 执行必要的过滤
- 创建一个 MapElement 对象并将 OrderRequest 转换为 String
- 将 OrderRequest 实体映射到字符串
- 将输出写入文件
代码:
// Creating pipeline
Pipeline pipeline = Pipeline.create();
// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
.apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));
PCollection<OrderRequest> pCollectionTransformation = orderRequest
.apply(ParDo.of(new DoFn<String, OrderRequest>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String rowString = c.element();
if (!rowString.contains("order_id")) {
String[] strArr = rowString.split(",");
OrderRequest orderRequest = new OrderRequest();
orderRequest.setOrder_id(strArr[0]);
// Condition to check if the
String source1 = strArr[1];
DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d1 = fmt1.parseDateTime(source1);
System.out.println(d1);
String source2 = "4/24/2017";
DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d2 = fmt2.parseDateTime(source2);
System.out.println(d2);
orderRequest.setOrder_date(strArr[1]);
System.out.println(strArr[1]);
orderRequest.setAmount(Double.valueOf(strArr[2]));
orderRequest.setCounter_id(strArr[3]);
if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
c.output(orderRequest);
}
}
}
}));
// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
.via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id() + " "
+ orderRequestType.getOrder_date() + " " + orderRequestType.getAmount() + " "
+ orderRequestType.getCounter_id());
// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);
// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
.withSuffix(".csv"));
// To run pipeline
pipeline.run();
System.out.println("We are done!!");
波乔 Class:
public class OrderRequest implements Serializable{
String order_id;
String order_date;
double amount;
String counter_id;
}
虽然我得到了正确的结果,但这是正确的方法吗?我的两个主要问题是
1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?
是的,您可以使用 TextIO.read()
像这样处理 CSV 文件,前提是它们不包含嵌入换行符的字段,并且您可以 recognize/skip header 行。您的管道看起来不错,但作为一个小的样式问题,我可能会让第一个 ParDo 只进行解析,然后是一个查看日期以过滤掉内容的过滤器。
如果您想自动推断 header 行,您可以在主程序中打开阅读第一行(使用标准 java 库,或 Beams FileSystems class) 并手动将其提取出来,将其传递给您的解析 DoFn。
我同意更柱状的方法会更自然。我们在 Python 中有这个作为我们的 Dataframes API which is now available for general use。你会写这样的东西
with beam.Pipeline() as p:
df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
filtered = df[df.order_date > limit]
filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")
我正在尝试从 CSV 文件中读取记录并根据日期过滤记录。我已经通过以下方式实现了这一点。但这是正确的方法吗?
步骤是:
- 正在创建管道
- 从文件中读取数据
- 执行必要的过滤
- 创建一个 MapElement 对象并将 OrderRequest 转换为 String
- 将 OrderRequest 实体映射到字符串
- 将输出写入文件
代码:
// Creating pipeline
Pipeline pipeline = Pipeline.create();
// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
.apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));
PCollection<OrderRequest> pCollectionTransformation = orderRequest
.apply(ParDo.of(new DoFn<String, OrderRequest>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String rowString = c.element();
if (!rowString.contains("order_id")) {
String[] strArr = rowString.split(",");
OrderRequest orderRequest = new OrderRequest();
orderRequest.setOrder_id(strArr[0]);
// Condition to check if the
String source1 = strArr[1];
DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d1 = fmt1.parseDateTime(source1);
System.out.println(d1);
String source2 = "4/24/2017";
DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d2 = fmt2.parseDateTime(source2);
System.out.println(d2);
orderRequest.setOrder_date(strArr[1]);
System.out.println(strArr[1]);
orderRequest.setAmount(Double.valueOf(strArr[2]));
orderRequest.setCounter_id(strArr[3]);
if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
c.output(orderRequest);
}
}
}
}));
// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
.via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id() + " "
+ orderRequestType.getOrder_date() + " " + orderRequestType.getAmount() + " "
+ orderRequestType.getCounter_id());
// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);
// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
.withSuffix(".csv"));
// To run pipeline
pipeline.run();
System.out.println("We are done!!");
波乔 Class:
public class OrderRequest implements Serializable{
String order_id;
String order_date;
double amount;
String counter_id;
}
虽然我得到了正确的结果,但这是正确的方法吗?我的两个主要问题是
1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?
是的,您可以使用 TextIO.read()
像这样处理 CSV 文件,前提是它们不包含嵌入换行符的字段,并且您可以 recognize/skip header 行。您的管道看起来不错,但作为一个小的样式问题,我可能会让第一个 ParDo 只进行解析,然后是一个查看日期以过滤掉内容的过滤器。
如果您想自动推断 header 行,您可以在主程序中打开阅读第一行(使用标准 java 库,或 Beams FileSystems class) 并手动将其提取出来,将其传递给您的解析 DoFn。
我同意更柱状的方法会更自然。我们在 Python 中有这个作为我们的 Dataframes API which is now available for general use。你会写这样的东西
with beam.Pipeline() as p:
df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
filtered = df[df.order_date > limit]
filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")