如何使用 Apache Beam Python SDK 从 Jdbc 读取数据并写入 bigquery

How to Read data from Jdbc and write to bigquery using Apache Beam Python Sdk

我正在尝试编写一个管道,它将从 JDBC(oracle,mssql) 读取数据,执行某些操作并写入 bigquery。

我在 ReadFromJdbc 步骤中苦苦挣扎,无法将其转换为正确的模式类型。

我的代码:

import typing

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from input.base_source_processor import BaseSourceProcessor
from apache_beam.io.jdbc import ReadFromJdbc


class Row(typing.NamedTuple):
   COUNTRY_ID: str
   COUNTRY_NAME: str
   inc_col: str


class RdbmsProcessor(BaseSourceProcessor, abc.ABC):
   def __init__(self, task):
       self.task = task

   def expand(self, p_input):
       row = typing.NamedTuple('row', [('COUNTRY_ID', str), ('COUNTRY_NAME', str), ('inc_col', str)])
       coders.registry.register_coder(Row, coders.RowCoder)

       data = (p_input
               | "Read from rdbms" >> ReadFromJdbc(
                   driver_class_name=self.task['rdbms_props']['driver_class_name'],
                   jdbc_url=self.task['rdbms_props']['jdbc_url'],
                   username=self.task['rdbms_props']['username'],
                   password=self.task['rdbms_props']['password'],
                   table_name='"dm-demo".COUNTRIES',
                   classpath=['/home/abhinav_jha_datametica_com/python_df/odbc_jars/ojdbc8.jar']
               )
               )

       data | beam.combiners.Count.Globally() | beam.Map(print)

       data | beam.Map(print)

       return data

我的数据有三列,其中两列是 Varchar,一列是时间戳。

我在 运行 来自数据流和直接运行程序时遇到的错误

ValueError: Failed to decode schema due to an issue with Field proto:
name: "COUNTRY_ID"
type {
  logical_type {
    urn: "beam:logical_type:javasdk:v1"
    payload: "2SNAPPY[=13=]0[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]25000U45[=13=]0[=13=]5sr[=13=]0=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthString\r<3\'6u17[=13=]2[=13=]0[=13=]1I[=13=]0\tmax\t54xr[=13=]082X[=13=]00JdbcL1i066173_3a[=13=]2[=13=]0[=13=]4L[=13=]00argumentt[=13=]02Ljava/lang/Object;L[=13=]04a\r [=13=]134t[=13=]0.Lorg/\t6[=13=]0/[=13=]164/sdk/schemas/S[=13=]504$Field[=13=]104;L[=13=]00base[=13=]14Dq[=13=]0~[=13=]0[=13=]3L[=13=]0\nidentifier6r[=13=]0\t70;xpsr[=13=]01[=13=]11[=13=]0.[=13=]11<.Integer22047178%[=13=]7$[=13=]5valuexr[=13=]001(hNumber64553403[=13=]2[=13=]0[=13=]0xp[=13=]0[=13=]0[=13=]0[=13=]7sr[=13=]06N([=13=]1\r24.AutoV[=13=]1N[=13=]0_\t4[=13=]4_F14h94m4S37P[=13=]2[=13=]00L[=13=]05collectionEle!/53[=13=]43l9\0t[=13=]006\"[=13=]1[=13=]0L12$;L[=13=]0\nmapKey5S44map[=13=]5754,0metadatat[=13=]07)24util/Map!g(nullablet[=13=]03\t5!>8/Boolean;L[=13=]0\trow\t30t[=13=]0$23[=13=]0[=13=]1T(typeNamet[=13=]0-22[=13=]0[=13=]0$[=13=]14[=13=]1/0;xr[=13=]0,nu[=13=]1\t0Y\'43PLl[703%6[=13=]1[=13=]14sr[=13=]06%3[=13=]12[=13=]4.C5|Ds$EmptyMapY645Z470[=13=]53[=13=]0s22[=13=]2\r4,5 r05426[=13=]2[=13=]0[=13=]1ZQ20p[=13=]0p~r[=13=]0+24[=13=]014[=13=]0[=13=]0\r[=13=]102[=13=]0[=13=]0xr[=13=]06[=13=]55!Z0.Enum\r4[=13=]55$pt[=13=]0[=13=]5INT32sA1[=13=]0\t[=13=]16[=13=]1\t[=13=]02[=13=]1[=13=]504p~[=13=]1[=13=]7\5t[=13=]0[=13=]6STRINGt[=13=]0[=13=]7VARCHAR[=13=]0[=13=]0[=13=]0[=13=]7"
    representation {
      atomic_type: STRING
    }
    argument_type {
      atomic_type: INT32
    }
    argument {
      atomic_value {
        int32: 7
      }
    }
  }
}

如有任何帮助,我们将不胜感激。

JdbcIO 似乎依赖于 Java-only 逻辑类型,因此 Python 无法反序列化它们。这是在 https://issues.apache.org/jira/browse/BEAM-13717

中跟踪的