Java:使用 apache beam pipeline 读取存储在存储桶中的 excel 文件
Java: read excel file stored in a bucket using apache beam pipeline
我知道以下方法,但我需要在 Apache Beam 管道中使用,请提供示例:
try (ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
"gs://bucketname/filename.xlsx", false ))) {
InputStream inputStream = Channels.newInputStream(chan);
我已经实现了从本地文件系统读取 .xlsx 文件,但同样适用于您的 GCS 存储桶路径。我在不同的管道中尝试过相同的方法,并且效果很好。
下面代码中的 enrichedCollection
可以被视为逐行读取的 .csv 文件。我使用分号作为分隔符来分隔值。
package com.fooBar;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.Iterator;
public class SampleExcelInput {
public static void main(String[] args) throws IOException {
Pipeline pipeline = Pipeline.create();
PCollection<FileIO.ReadableFile> inputCollection = pipeline.apply(FileIO.match()
// .filepattern("gs://bucket/file.xlsx"))
.filepattern("C:\Workspace\ApacheBeam\src\main\resources\Inputfiles\SampleExcel.xlsx"))
.apply(FileIO.readMatches());
PCollection<String> enrichedCollection = inputCollection.apply(ParDo.of(new ReadXlsxDoFn()));
//TODO: do further processing treating the lines of enrichedCollection pcollection as if they were read from csv
pipeline.run().waitUntilFinish();
}
static class ReadXlsxDoFn extends DoFn<FileIO.ReadableFile, String>{
final static String DELIMITER = ";";
@ProcessElement
public void process(ProcessContext c) throws IOException {
FileIO.ReadableFile fileName = c.element();
System.out.println("FileName being read is :" + fileName);
assert fileName != null;
InputStream stream = Channels.newInputStream(fileName.openSeekable());
XSSFWorkbook wb = new XSSFWorkbook(stream);
XSSFSheet sheet = wb.getSheetAt(0); //creating a Sheet object to retrieve object
//iterating over Excel file
for (Row row : sheet) {
Iterator<Cell> cellIterator = row.cellIterator(); //iterating over each column
StringBuilder sb = new StringBuilder();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
if(cell.getCellType() == Cell.CELL_TYPE_NUMERIC){
sb.append(cell.getNumericCellValue()).append(DELIMITER);
}
else{
sb.append(cell.getStringCellValue()).append(DELIMITER);
}
}
System.out.println(sb.substring(0, sb.length()-1));
c.output(sb.substring(0, sb.length()-1));//removing the delimiter present @End of String
}
}
}
}
对于依赖项,我不得不手动添加一些 jar 以使其工作,您可以从 here
中获取该参考
除了上面的 Jars,我还有以下作为我的 maven 依赖项。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.37.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
Link 对于示例 .xlsx 文件:here
DoFn 的控制台输出
FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java
FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java Masterclass\ApacheBeam\src\main\resources\Inputfiles\SampleExcel.xlsx, sizeBytes=7360, isReadSeekEfficient=true, checksum=null, lastModifiedMillis=0}, compression=UNCOMPRESSED}
0.0;First Name;Last Name;Gender;Country;Age;Date;Id
1.0;Dulce;Abril;Female;United States;32.0;15/10/2017;1562.0
2.0;Mara;Hashimoto;Female;Great Britain;25.0;16/08/2016;1582.0
3.0;Philip;Gent;Male;France;36.0;21/05/2015;2587.0
4.0;Kathleen;Hanner;Female;United States;25.0;15/10/2017;3549.0
5.0;Nereida;Magwood;Female;United States;58.0;16/08/2016;2468.0
6.0;Gaston;Brumm;Male;United States;24.0;21/05/2015;2554.0
7.0;Etta;Hurn;Female;Great Britain;56.0;15/10/2017;3598.0
8.0;Earlean;Melgar;Female;United States;27.0;16/08/2016;2456.0
.
.
.
50.0;Rasheeda;Alkire;Female;United States;29.0;16/08/2016;6125.0
Process finished with exit code 0
注意:由于文件是在 Simple DoFn 中逐行解析的,这意味着每个文件一个线程。如果你只有一个非常大的文件,比如 ~5GB,你会发现性能显着下降。一种解决方法是减小输入文件的大小并使用通配符文件模式。
我知道以下方法,但我需要在 Apache Beam 管道中使用,请提供示例:
try (ReadableByteChannel chan = FileSystems.open(FileSystems.matchNewResource(
"gs://bucketname/filename.xlsx", false ))) {
InputStream inputStream = Channels.newInputStream(chan);
我已经实现了从本地文件系统读取 .xlsx 文件,但同样适用于您的 GCS 存储桶路径。我在不同的管道中尝试过相同的方法,并且效果很好。
下面代码中的 enrichedCollection
可以被视为逐行读取的 .csv 文件。我使用分号作为分隔符来分隔值。
package com.fooBar;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.Iterator;
public class SampleExcelInput {
public static void main(String[] args) throws IOException {
Pipeline pipeline = Pipeline.create();
PCollection<FileIO.ReadableFile> inputCollection = pipeline.apply(FileIO.match()
// .filepattern("gs://bucket/file.xlsx"))
.filepattern("C:\Workspace\ApacheBeam\src\main\resources\Inputfiles\SampleExcel.xlsx"))
.apply(FileIO.readMatches());
PCollection<String> enrichedCollection = inputCollection.apply(ParDo.of(new ReadXlsxDoFn()));
//TODO: do further processing treating the lines of enrichedCollection pcollection as if they were read from csv
pipeline.run().waitUntilFinish();
}
static class ReadXlsxDoFn extends DoFn<FileIO.ReadableFile, String>{
final static String DELIMITER = ";";
@ProcessElement
public void process(ProcessContext c) throws IOException {
FileIO.ReadableFile fileName = c.element();
System.out.println("FileName being read is :" + fileName);
assert fileName != null;
InputStream stream = Channels.newInputStream(fileName.openSeekable());
XSSFWorkbook wb = new XSSFWorkbook(stream);
XSSFSheet sheet = wb.getSheetAt(0); //creating a Sheet object to retrieve object
//iterating over Excel file
for (Row row : sheet) {
Iterator<Cell> cellIterator = row.cellIterator(); //iterating over each column
StringBuilder sb = new StringBuilder();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
if(cell.getCellType() == Cell.CELL_TYPE_NUMERIC){
sb.append(cell.getNumericCellValue()).append(DELIMITER);
}
else{
sb.append(cell.getStringCellValue()).append(DELIMITER);
}
}
System.out.println(sb.substring(0, sb.length()-1));
c.output(sb.substring(0, sb.length()-1));//removing the delimiter present @End of String
}
}
}
}
对于依赖项,我不得不手动添加一些 jar 以使其工作,您可以从 here
中获取该参考除了上面的 Jars,我还有以下作为我的 maven 依赖项。
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.37.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.37.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
Link 对于示例 .xlsx 文件:here
DoFn 的控制台输出
FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java
FileName being read is :ReadableFile{metadata=Metadata{resourceId=C:\Users\USER\Desktop\Java Masterclass\ApacheBeam\src\main\resources\Inputfiles\SampleExcel.xlsx, sizeBytes=7360, isReadSeekEfficient=true, checksum=null, lastModifiedMillis=0}, compression=UNCOMPRESSED}
0.0;First Name;Last Name;Gender;Country;Age;Date;Id
1.0;Dulce;Abril;Female;United States;32.0;15/10/2017;1562.0
2.0;Mara;Hashimoto;Female;Great Britain;25.0;16/08/2016;1582.0
3.0;Philip;Gent;Male;France;36.0;21/05/2015;2587.0
4.0;Kathleen;Hanner;Female;United States;25.0;15/10/2017;3549.0
5.0;Nereida;Magwood;Female;United States;58.0;16/08/2016;2468.0
6.0;Gaston;Brumm;Male;United States;24.0;21/05/2015;2554.0
7.0;Etta;Hurn;Female;Great Britain;56.0;15/10/2017;3598.0
8.0;Earlean;Melgar;Female;United States;27.0;16/08/2016;2456.0
.
.
.
50.0;Rasheeda;Alkire;Female;United States;29.0;16/08/2016;6125.0
Process finished with exit code 0
注意:由于文件是在 Simple DoFn 中逐行解析的,这意味着每个文件一个线程。如果你只有一个非常大的文件,比如 ~5GB,你会发现性能显着下降。一种解决方法是减小输入文件的大小并使用通配符文件模式。