在 GCP DatafLow 中读取自定义输入文件 (ldif) type/format
Read custom input file(ldif) type/format in GCP DatafLow
我有来自 LDAP 系统的 ldif 扩展文件。我能够在 python 中轻松解析它并从文件中提取所需数据并插入到 SQL 服务器中。我的示例 python 如下所示。
import os
from ldif3 import LDIFParser
import pymssql
parser = LDIFParser(open('temp.ldiff', 'rb'))
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
def add_new_user():
for dn, entry in parser.parse():
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
#some code to insert into SQL server
add_new_user()
但是当我想将其转换为数据流时,我无法理解要修改的内容和位置。我的数据流代码如下所示
class sqlserverwriteDoFn(beam.DoFn):
#insert statement
class CreateEntities(beam.DoFn):
def process(self, element):
#figure out how to return dictionary if parsed correctly
return [{"email": email, "password": password}]
def dataflow(input_file, pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading Ldif data from GCS' >> beam.io.ReadFromText(input_file)
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Insert data to SQLSERVER' >> beam.ParDo(sqlserverwriteDoFn(pipeline_options['project']))
)
我认为 ReadFromText 将每一行转换为 pcollection,这在我的例子中不起作用。示例 ldif 文件如下所示
dn: uid=12345,ab=users,xy=random
phone: 111
address: someaddress
email: true
username:abc
password:abc
dn: uid=12345,ab=users,xy=random
objectClass: inetOrgPerson
objectClass: top
phone: 111
address: someaddress
email: true
username:abcd
password:abcd
非常感谢任何想法,因为我希望从 LDIF 文件中导入 5000 万个用户名和密码,绝对简单 python for 循环无法扩展。
[Edit1] 根据评论,修改代码并出现其他错误
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
class CreateEntities(beam.DoFn):
def process(self, file):
parser = LDIFParser(open(file, 'rb'))
arr=[]
for dn, entry in parser.parse():
dict1 ={}
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
dict1['email'] = email
dict1['password'] = password
arr.append(dict1)
return arr
def dataflow(pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading data from GCS' >> MatchFiles(file_pattern="temp.ldiff")
| 'file match' >> ReadMatches()
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'print to screen' >> beam.Map(print)
)
出现以下错误
File "dataflow.py", line 26, in process
parser = LDIFParser(open(file, 'rb'))
TypeError: expected str, bytes or os.PathLike object, not ReadableFile [while running 'Create entities']
编辑2
更改了一行 python 代码如下
parser = LDIFParser(file)
收到此错误
File "dataflow.py", line 28, in process
for dn, entry in parser.parse():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 383, in parse
for block in self._iter_blocks():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 282, in _iter_blocks
for line in self._iter_unfolded_lines():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 263, in _iter_unfolded_lines
line = self._input_file.readline()
AttributeError: 'ReadableFile' object has no attribute 'readline' [while running 'Create entities']
我应该如何更改我的代码以解决错误?
你是对的,Python SDK中的TextIO使用换行符作为分隔元素的分隔符。所以生成的每个元素都是输入文件的一行。
在您的原始代码中,您已经有了一个可以读取 LDIF 文件的解析器。如果您想在 Dataflow 上使用现有的解析器,您可以通过 ParDo transform. I would recommend beginning with FileIO to create a PCollection of LDIF files, and then use those as input to your own ParDo which parses those files and outputs your records. Note that you will likely want to read on managing Beam Python dependencies 在您的管道中使用它,因为您的 Dataflow worker 将需要访问该依赖项。
我有来自 LDAP 系统的 ldif 扩展文件。我能够在 python 中轻松解析它并从文件中提取所需数据并插入到 SQL 服务器中。我的示例 python 如下所示。
import os
from ldif3 import LDIFParser
import pymssql
parser = LDIFParser(open('temp.ldiff', 'rb'))
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
def add_new_user():
for dn, entry in parser.parse():
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
#some code to insert into SQL server
add_new_user()
但是当我想将其转换为数据流时,我无法理解要修改的内容和位置。我的数据流代码如下所示
class sqlserverwriteDoFn(beam.DoFn):
#insert statement
class CreateEntities(beam.DoFn):
def process(self, element):
#figure out how to return dictionary if parsed correctly
return [{"email": email, "password": password}]
def dataflow(input_file, pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading Ldif data from GCS' >> beam.io.ReadFromText(input_file)
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Insert data to SQLSERVER' >> beam.ParDo(sqlserverwriteDoFn(pipeline_options['project']))
)
我认为 ReadFromText 将每一行转换为 pcollection,这在我的例子中不起作用。示例 ldif 文件如下所示
dn: uid=12345,ab=users,xy=random
phone: 111
address: someaddress
email: true
username:abc
password:abc
dn: uid=12345,ab=users,xy=random
objectClass: inetOrgPerson
objectClass: top
phone: 111
address: someaddress
email: true
username:abcd
password:abcd
非常感谢任何想法,因为我希望从 LDIF 文件中导入 5000 万个用户名和密码,绝对简单 python for 循环无法扩展。
[Edit1] 根据评论,修改代码并出现其他错误
def return_dictionary_element_if_present(dict_entry, element):
if dict_entry.get(element):
return dict_entry.get(element)[0]
return ''
class CreateEntities(beam.DoFn):
def process(self, file):
parser = LDIFParser(open(file, 'rb'))
arr=[]
for dn, entry in parser.parse():
dict1 ={}
dict_entry = dict(entry)
email = return_dictionary_element_if_present(dict_entry,'email')
password = return_dictionary_element_if_present(dict_entry,'password')
dict1['email'] = email
dict1['password'] = password
arr.append(dict1)
return arr
def dataflow(pipeline_options):
print("starting")
options = GoogleCloudOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading data from GCS' >> MatchFiles(file_pattern="temp.ldiff")
| 'file match' >> ReadMatches()
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'print to screen' >> beam.Map(print)
)
出现以下错误
File "dataflow.py", line 26, in process
parser = LDIFParser(open(file, 'rb'))
TypeError: expected str, bytes or os.PathLike object, not ReadableFile [while running 'Create entities']
编辑2 更改了一行 python 代码如下
parser = LDIFParser(file)
收到此错误
File "dataflow.py", line 28, in process
for dn, entry in parser.parse():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 383, in parse
for block in self._iter_blocks():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 282, in _iter_blocks
for line in self._iter_unfolded_lines():
File "C:\Users\sande\anaconda3\envs\saopaulo\lib\site-packages\ldif3.py", line 263, in _iter_unfolded_lines
line = self._input_file.readline()
AttributeError: 'ReadableFile' object has no attribute 'readline' [while running 'Create entities']
我应该如何更改我的代码以解决错误?
你是对的,Python SDK中的TextIO使用换行符作为分隔元素的分隔符。所以生成的每个元素都是输入文件的一行。
在您的原始代码中,您已经有了一个可以读取 LDIF 文件的解析器。如果您想在 Dataflow 上使用现有的解析器,您可以通过 ParDo transform. I would recommend beginning with FileIO to create a PCollection of LDIF files, and then use those as input to your own ParDo which parses those files and outputs your records. Note that you will likely want to read on managing Beam Python dependencies 在您的管道中使用它,因为您的 Dataflow worker 将需要访问该依赖项。