如何使用 Apache Beam Python SDK 从 Jdbc 读取数据并写入 bigquery
How to Read data from Jdbc and write to bigquery using Apache Beam Python Sdk
我正在尝试编写一个管道,它将从 JDBC(oracle,mssql) 读取数据,执行某些操作并写入 bigquery。
我在 ReadFromJdbc 步骤中苦苦挣扎,无法将其转换为正确的模式类型。
我的代码:
import typing
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from input.base_source_processor import BaseSourceProcessor
from apache_beam.io.jdbc import ReadFromJdbc
class Row(typing.NamedTuple):
COUNTRY_ID: str
COUNTRY_NAME: str
inc_col: str
class RdbmsProcessor(BaseSourceProcessor, abc.ABC):
def __init__(self, task):
self.task = task
def expand(self, p_input):
row = typing.NamedTuple('row', [('COUNTRY_ID', str), ('COUNTRY_NAME', str), ('inc_col', str)])
coders.registry.register_coder(Row, coders.RowCoder)
data = (p_input
| "Read from rdbms" >> ReadFromJdbc(
driver_class_name=self.task['rdbms_props']['driver_class_name'],
jdbc_url=self.task['rdbms_props']['jdbc_url'],
username=self.task['rdbms_props']['username'],
password=self.task['rdbms_props']['password'],
table_name='"dm-demo".COUNTRIES',
classpath=['/home/abhinav_jha_datametica_com/python_df/odbc_jars/ojdbc8.jar']
)
)
data | beam.combiners.Count.Globally() | beam.Map(print)
data | beam.Map(print)
return data
我的数据有三列,其中两列是 Varchar,一列是时间戳。
我在 运行 来自数据流和直接运行程序时遇到的错误
ValueError: Failed to decode schema due to an issue with Field proto:
name: "COUNTRY_ID"
type {
logical_type {
urn: "beam:logical_type:javasdk:v1"
payload: "2SNAPPY[=13=]0[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]25000U45[=13=]0[=13=]5sr[=13=]0=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthString\r<3\'6u17[=13=]2[=13=]0[=13=]1I[=13=]0\tmax\t54xr[=13=]082X[=13=]00JdbcL1i066173_3a[=13=]2[=13=]0[=13=]4L[=13=]00argumentt[=13=]02Ljava/lang/Object;L[=13=]04a\r [=13=]134t[=13=]0.Lorg/\t6[=13=]0/[=13=]164/sdk/schemas/S[=13=]504$Field[=13=]104;L[=13=]00base[=13=]14Dq[=13=]0~[=13=]0[=13=]3L[=13=]0\nidentifier6r[=13=]0\t70;xpsr[=13=]01[=13=]11[=13=]0.[=13=]11<.Integer22047178%[=13=]7$[=13=]5valuexr[=13=]001(hNumber64553403[=13=]2[=13=]0[=13=]0xp[=13=]0[=13=]0[=13=]0[=13=]7sr[=13=]06N([=13=]1\r24.AutoV[=13=]1N[=13=]0_\t4[=13=]4_F14h94m4S37P[=13=]2[=13=]00L[=13=]05collectionEle!/53[=13=]43l9\0t[=13=]006\"[=13=]1[=13=]0L12$;L[=13=]0\nmapKey5S44map[=13=]5754,0metadatat[=13=]07)24util/Map!g(nullablet[=13=]03\t5!>8/Boolean;L[=13=]0\trow\t30t[=13=]0$23[=13=]0[=13=]1T(typeNamet[=13=]0-22[=13=]0[=13=]0$[=13=]14[=13=]1/0;xr[=13=]0,nu[=13=]1\t0Y\'43PLl[703%6[=13=]1[=13=]14sr[=13=]06%3[=13=]12[=13=]4.C5|Ds$EmptyMapY645Z470[=13=]53[=13=]0s22[=13=]2\r4,5 r05426[=13=]2[=13=]0[=13=]1ZQ20p[=13=]0p~r[=13=]0+24[=13=]014[=13=]0[=13=]0\r[=13=]102[=13=]0[=13=]0xr[=13=]06[=13=]55!Z0.Enum\r4[=13=]55$pt[=13=]0[=13=]5INT32sA1[=13=]0\t[=13=]16[=13=]1\t[=13=]02[=13=]1[=13=]504p~[=13=]1[=13=]7\5t[=13=]0[=13=]6STRINGt[=13=]0[=13=]7VARCHAR[=13=]0[=13=]0[=13=]0[=13=]7"
representation {
atomic_type: STRING
}
argument_type {
atomic_type: INT32
}
argument {
atomic_value {
int32: 7
}
}
}
}
如有任何帮助,我们将不胜感激。
JdbcIO 似乎依赖于 Java-only 逻辑类型,因此 Python 无法反序列化它们。这是在 https://issues.apache.org/jira/browse/BEAM-13717
中跟踪的
我正在尝试编写一个管道,它将从 JDBC(oracle,mssql) 读取数据,执行某些操作并写入 bigquery。
我在 ReadFromJdbc 步骤中苦苦挣扎,无法将其转换为正确的模式类型。
我的代码:
import typing
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from input.base_source_processor import BaseSourceProcessor
from apache_beam.io.jdbc import ReadFromJdbc
class Row(typing.NamedTuple):
COUNTRY_ID: str
COUNTRY_NAME: str
inc_col: str
class RdbmsProcessor(BaseSourceProcessor, abc.ABC):
def __init__(self, task):
self.task = task
def expand(self, p_input):
row = typing.NamedTuple('row', [('COUNTRY_ID', str), ('COUNTRY_NAME', str), ('inc_col', str)])
coders.registry.register_coder(Row, coders.RowCoder)
data = (p_input
| "Read from rdbms" >> ReadFromJdbc(
driver_class_name=self.task['rdbms_props']['driver_class_name'],
jdbc_url=self.task['rdbms_props']['jdbc_url'],
username=self.task['rdbms_props']['username'],
password=self.task['rdbms_props']['password'],
table_name='"dm-demo".COUNTRIES',
classpath=['/home/abhinav_jha_datametica_com/python_df/odbc_jars/ojdbc8.jar']
)
)
data | beam.combiners.Count.Globally() | beam.Map(print)
data | beam.Map(print)
return data
我的数据有三列,其中两列是 Varchar,一列是时间戳。
我在 运行 来自数据流和直接运行程序时遇到的错误
ValueError: Failed to decode schema due to an issue with Field proto:
name: "COUNTRY_ID"
type {
logical_type {
urn: "beam:logical_type:javasdk:v1"
payload: "2SNAPPY[=13=]0[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]0[=13=]1[=13=]0[=13=]0[=13=]25000U45[=13=]0[=13=]5sr[=13=]0=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthString\r<3\'6u17[=13=]2[=13=]0[=13=]1I[=13=]0\tmax\t54xr[=13=]082X[=13=]00JdbcL1i066173_3a[=13=]2[=13=]0[=13=]4L[=13=]00argumentt[=13=]02Ljava/lang/Object;L[=13=]04a\r [=13=]134t[=13=]0.Lorg/\t6[=13=]0/[=13=]164/sdk/schemas/S[=13=]504$Field[=13=]104;L[=13=]00base[=13=]14Dq[=13=]0~[=13=]0[=13=]3L[=13=]0\nidentifier6r[=13=]0\t70;xpsr[=13=]01[=13=]11[=13=]0.[=13=]11<.Integer22047178%[=13=]7$[=13=]5valuexr[=13=]001(hNumber64553403[=13=]2[=13=]0[=13=]0xp[=13=]0[=13=]0[=13=]0[=13=]7sr[=13=]06N([=13=]1\r24.AutoV[=13=]1N[=13=]0_\t4[=13=]4_F14h94m4S37P[=13=]2[=13=]00L[=13=]05collectionEle!/53[=13=]43l9\0t[=13=]006\"[=13=]1[=13=]0L12$;L[=13=]0\nmapKey5S44map[=13=]5754,0metadatat[=13=]07)24util/Map!g(nullablet[=13=]03\t5!>8/Boolean;L[=13=]0\trow\t30t[=13=]0$23[=13=]0[=13=]1T(typeNamet[=13=]0-22[=13=]0[=13=]0$[=13=]14[=13=]1/0;xr[=13=]0,nu[=13=]1\t0Y\'43PLl[703%6[=13=]1[=13=]14sr[=13=]06%3[=13=]12[=13=]4.C5|Ds$EmptyMapY645Z470[=13=]53[=13=]0s22[=13=]2\r4,5 r05426[=13=]2[=13=]0[=13=]1ZQ20p[=13=]0p~r[=13=]0+24[=13=]014[=13=]0[=13=]0\r[=13=]102[=13=]0[=13=]0xr[=13=]06[=13=]55!Z0.Enum\r4[=13=]55$pt[=13=]0[=13=]5INT32sA1[=13=]0\t[=13=]16[=13=]1\t[=13=]02[=13=]1[=13=]504p~[=13=]1[=13=]7\5t[=13=]0[=13=]6STRINGt[=13=]0[=13=]7VARCHAR[=13=]0[=13=]0[=13=]0[=13=]7"
representation {
atomic_type: STRING
}
argument_type {
atomic_type: INT32
}
argument {
atomic_value {
int32: 7
}
}
}
}
如有任何帮助,我们将不胜感激。
JdbcIO 似乎依赖于 Java-only 逻辑类型,因此 Python 无法反序列化它们。这是在 https://issues.apache.org/jira/browse/BEAM-13717
中跟踪的