为什么 ParDo 不在 DataflowRunner 上工作?
Why is ParDo not working on DataflowRunner?
我 从 DirectRunner 切换到 Dataflowrunner 时遇到问题:Pardo 显然不起作用。当我将 runner 设置为 Dataflowrunner 时,def process(self, query) 永远不会运行。
我可以看到工作在 GCP 上工作,但是,我的方法 InsertPostgresql 在使用 Dataflowrunner 时不起作用。
根据日志错误,ParDo 显然无法识别 'psycopg2'。
process NameError: name 'psycopg2' is not defined During handling of
the above exception
我不知道为什么?
def run_pipeline():
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
optionsGCP = {
'streaming': True,
'project':"<myproject>",
'region':"us-central1",
'temp_location':"gs://poc360-bucket/temp",
'staging_location':"gs://poc360-bucket/staging",
'drivername':"postgresql",
'save_main_session': True,
'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'
}
paramsDB = {
'database': '<mydatabase>',
'user': '<myuser>',
'password': '<mypassword>',
'host': 'localhost',
'port': '5000'
}
class InsertPostgresql(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
con.commit()
resultado=cur.fetchall()
cur.close()
con.close()
yield resultado
runner='Dataflowrunner'
options = PipelineOptions(**optionsGCP)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
p = beam.Pipeline(runner=runner,options=options)
data = (p
| beam.Create([sql])
| beam.ParDo(InsertPostgresql(**paramsDB))
)
data | 'teste' >> beam.Map(print)
print("Lines: ", data)
result = p.run()
result.wait_until_finish()
if __name__=='__main__':
run_pipeline()
离开作业执行,一段时间后,我得到了这个:
2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerMessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
除此之外,工作告:
应用 setup.py 文件后,我得到:
setup.py:
import setuptools
setuptools.setup(
name='psycopg2',
version='2.8.6',
install_requires=[],
packages=setuptools.find_packages(),
)
数据流日志错误:
2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 48, in process
NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback
(most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256,
in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313,
in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return
getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518,
in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371,
in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py",
line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py",
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1085)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.RuntimeException: Error received from SDK harness for instruction -1396:
Traceback (most recent call last): File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
使用requirements.txt:
我还在等待作业日志...直到现在,我的 PostgreSQL table 仍然是空的。
几分钟后我得到了日志,我仍然无法定义psycopg2:
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerMessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
我的requirements.txt:
关于 NameErros
的故障排除指南之后
显然,我们修复了 NameError。现在我想弄清楚为什么我的 SQL 没有被执行。我的工作仍在进行,但我的 PostgreSQL 没有任何记录。所以,还是有问题。
当我调试时,“进程”中的行被忽略了。参数使用正确吗?
毕竟:
谢谢,
朱利安诺
似乎没有向工作人员提供 psycopg2 包。有关如何在 python 中管理依赖项的信息,请参阅 https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/。在这种情况下,由于 psycopg2 在 PyPi 中可用,您只需将其包含在通过 --requirements_file 管道选项传递的 requirements.txt 文件中。
这个问题可能与全球进口有关。从这个 troubleshooting guide 中可以看出,从直接运行器切换到符合您实际情况的数据流运行器时,这是一个常见问题。
鉴于您已经尝试过添加标志 --save_main_session=True
的解决方案,我会尝试在函数本身中导入模块,而不是:
import psycopg2
(...)
def process(self, query):
con = psycopg2.connect(**self.config)
尝试:
def process(self, query):
import psycopg2
con = psycopg2.connect(**self.config)
---
编辑
鉴于我们绕过了该错误并且它在 DirectRunner 中正常工作,我怀疑问题出在代码本身。我会查看防火墙规则以确保数据流工作人员可以连接到 PostgreSQL 数据库,并且我会检查控制器服务帐户是否具有必要的权限。
我看到您在代码中指定了一些凭据,考虑到 Dataflow 工作人员使用默认 controller service account <project-number>-compute@developer.gserviceaccount.com
或自定义凭据,并且他们需要具有连接所需的权限到数据库。
这两件事(工作人员使用的防火墙 rules/Service 帐户)是直接运行器和数据流运行器之间的潜在差异,因此可能是问题的原因,请检查它们。
我 从 DirectRunner 切换到 Dataflowrunner 时遇到问题:Pardo 显然不起作用。当我将 runner 设置为 Dataflowrunner 时,def process(self, query) 永远不会运行。 我可以看到工作在 GCP 上工作,但是,我的方法 InsertPostgresql 在使用 Dataflowrunner 时不起作用。
根据日志错误,ParDo 显然无法识别 'psycopg2'。
process NameError: name 'psycopg2' is not defined During handling of the above exception
我不知道为什么?
def run_pipeline():
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
optionsGCP = {
'streaming': True,
'project':"<myproject>",
'region':"us-central1",
'temp_location':"gs://poc360-bucket/temp",
'staging_location':"gs://poc360-bucket/staging",
'drivername':"postgresql",
'save_main_session': True,
'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'
}
paramsDB = {
'database': '<mydatabase>',
'user': '<myuser>',
'password': '<mypassword>',
'host': 'localhost',
'port': '5000'
}
class InsertPostgresql(beam.DoFn):
def __init__(self, **server_config):
self.config = server_config
def process(self, query):
con = psycopg2.connect(**self.config)
cur = con.cursor()
cur.execute(query)
con.commit()
resultado=cur.fetchall()
cur.close()
con.close()
yield resultado
runner='Dataflowrunner'
options = PipelineOptions(**optionsGCP)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
p = beam.Pipeline(runner=runner,options=options)
data = (p
| beam.Create([sql])
| beam.ParDo(InsertPostgresql(**paramsDB))
)
data | 'teste' >> beam.Map(print)
print("Lines: ", data)
result = p.run()
result.wait_until_finish()
if __name__=='__main__':
run_pipeline()
离开作业执行,一段时间后,我得到了这个:
2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerMessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
除此之外,工作告:
应用 setup.py 文件后,我得到:
setup.py:
import setuptools
setuptools.setup(
name='psycopg2',
version='2.8.6',
install_requires=[],
packages=setuptools.find_packages(),
)
数据流日志错误:
2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in
apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 48, in process
NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback
(most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256,
in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313,
in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return
getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518,
in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671,
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279,
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569,
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371,
in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py",
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py",
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py",
line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py",
line 1215,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py",
line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py",
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1085)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.RuntimeException: Error received from SDK harness for instruction -1396:
Traceback (most recent call last): File "apache_beam/runners/common.py",
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py",
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py",
使用requirements.txt:
我还在等待作业日志...直到现在,我的 PostgreSQL table 仍然是空的。
几分钟后我得到了日志,我仍然无法定义psycopg2:
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerMessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
我的requirements.txt:
关于 NameErros
的故障排除指南之后显然,我们修复了 NameError。现在我想弄清楚为什么我的 SQL 没有被执行。我的工作仍在进行,但我的 PostgreSQL 没有任何记录。所以,还是有问题。
当我调试时,“进程”中的行被忽略了。参数使用正确吗?
毕竟:
谢谢, 朱利安诺
似乎没有向工作人员提供 psycopg2 包。有关如何在 python 中管理依赖项的信息,请参阅 https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/。在这种情况下,由于 psycopg2 在 PyPi 中可用,您只需将其包含在通过 --requirements_file 管道选项传递的 requirements.txt 文件中。
这个问题可能与全球进口有关。从这个 troubleshooting guide 中可以看出,从直接运行器切换到符合您实际情况的数据流运行器时,这是一个常见问题。
鉴于您已经尝试过添加标志 --save_main_session=True
的解决方案,我会尝试在函数本身中导入模块,而不是:
import psycopg2
(...)
def process(self, query):
con = psycopg2.connect(**self.config)
尝试:
def process(self, query):
import psycopg2
con = psycopg2.connect(**self.config)
--- 编辑
鉴于我们绕过了该错误并且它在 DirectRunner 中正常工作,我怀疑问题出在代码本身。我会查看防火墙规则以确保数据流工作人员可以连接到 PostgreSQL 数据库,并且我会检查控制器服务帐户是否具有必要的权限。
我看到您在代码中指定了一些凭据,考虑到 Dataflow 工作人员使用默认 controller service account <project-number>-compute@developer.gserviceaccount.com
或自定义凭据,并且他们需要具有连接所需的权限到数据库。
这两件事(工作人员使用的防火墙 rules/Service 帐户)是直接运行器和数据流运行器之间的潜在差异,因此可能是问题的原因,请检查它们。