如何使用 Java 将 Avro 架构发送到 GCP BigQuery?

How to send Avro schema to GCP BigQuery using Java?

我正在尝试使用 Java 11 和 Spring 2 将 avro 模式发送到 GCP BigQuery。我研究了很多信息,但没有找到如何执行此操作的示例以如下格式发送 avro 文件架构:

{"namespace": "example.gcp",
  "type": "record",
  "name": "Client",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string"},
    {"name": "phone", "type": ["string", "null"]},
    {"name": "address", "type": ["string", "null"]}
  ]
}

我可以使用以下代码片段以格式 .avro 作为二进制文件发送:

@PostMapping("/uploadFileAvro")
public ModelAndView handleFileUpload(
        @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
        throws IOException {

    ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
            tableName, file.getInputStream(), FormatOptions.avro());

    return getResponse(loadJob, tableName);
}

格式 .csv 文件使用如下代码:

@PostMapping("/uploadFileCSV")
public ModelAndView handleFileUploadCSV(
        @RequestParam("file") MultipartFile file, @RequestParam("tableName") String tableName)
        throws IOException {

    ListenableFuture<Job> loadJob = this.bigQueryTemplate.writeDataToTable(
            tableName, file.getInputStream(), FormatOptions.csv());

    return getResponse(loadJob, tableName);
}

但是当我尝试以 .avro 格式发送架构文件而不是二进制文件时,出现错误:

java.util.concurrent.ExecutionException: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
    at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119)
    at org.springframework.cloud.springbootbigqueryapp.controller.WebController.getResponse(WebController.java:107)
    at org.springframework.cloud.springbootbigqueryapp.controller.WebController.handleFileUpload(WebController.java:63)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:878)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:792)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.cloud.gcp.bigquery.core.BigQueryException: Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Invalid data file. Magic does not match: gs://bigquery-prod-upload-us/prod-scotty-fa6aadb4-b3d6-40db-a39f-2025f1a99019
    at org.springframework.cloud.gcp.bigquery.core.BigQueryTemplate.lambda$createJobFuture[=14=](BigQueryTemplate.java:170)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more

有人能告诉我如何将 avro 架构文件发送到 GCP BigQuery 吗?

P.S。正如我从 video 中正确理解的那样,我可以以某种方式以编程方式做到这一点。

BigQuery 本身没有架构注册表的概念,因此您可能希望使用架构来创建 table,或将数据加载到 bigquery。

在 table 创建案例中,您希望将 avro 模式转换为 bigquery 模式。 BigQuery 不直接使用 avro 架构表示。这是 java 中使用显式模式创建 table 的示例:https://cloud.google.com/bigquery/docs/samples/bigquery-create-table#bigquery_create_table-java

在负载情况下,无需独立于数据传输架构。 Avro OCF 结构通常包括作为 header 一部分的架构,以及一组数据块。只需指定 avro 文件的 URI:https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-gcs-avro#bigquery_load_table_gcs_avro-java