Beam DirectRunner 方解石无法指定名称
Beam DirectRunner Calcite can't specify name
我是 运行 这个 beam tutorial 的简化版本,但是 运行 它使用我本地机器上的 DirectRunner。
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
import os
with beam.Pipeline() as p:
rows = (p |
beam.Create([
beam.Row(col1="val1", col2="col2_val1"),
beam.Row(col1="val2", col2="col2_val2"),
]
))
({"my_table": rows} | SqlTransform("""SELECT * FROM my_table"""))
如果我将 my_table
更改为 PCOLLECTION
它会起作用(尽管要真正起作用我需要传入 rows
而不是 dict.
我得到的错误信息:
Traceback (most recent call last):
File "./lib/scratch/test_join.py", line 12, in <module>
({"my_table": rows} | SqlTransform("""SELECT * FROM my_table"""))
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 606, in __ror__
result = p.apply(self, pvalueish, label)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/pipeline.py", line 694, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply
return m(transform, input, options)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
return transform.expand(input)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 305, in expand
raise RuntimeError(response.error)
RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT * FROM my_table
at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:214)
at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:367)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:470)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:546)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:219)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerHalfClosed.runInContext(ServerImpl.java:797)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 22: Object 'my_table' not found
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:183)
... 17 more
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 22: Object 'my_table' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:172)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
... 18 more
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'my_table' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 37 more
您遇到此错误是因为您只有一个 PCollection。似乎 Apache Beam 将使用 PCOLLECTION
作为 table 源,即使您创建了一个具有 {"my_table": rows}
.
键值对的字典
作为解决方法,如果您想在 SQL 语句中明确定义 table 名称,您可以创建一个包含虚拟值的临时 PCollection。创建一个字典,其中包含两个 PCollections 的键值对。
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
import os
with beam.Pipeline() as p:
rows = (p |
"create rows" >> beam.Create([
beam.Row(col1="val1", col2="col2_val1"),
beam.Row(col1="val2", col2="col2_val2"),
]
))
rows_2 = (p |
"create rows_2" >> beam.Create([
beam.Row(col1_1="val1", col2_1="123"),
]
))
({"my_table": rows, "my_table2": rows_2} | SqlTransform("""SELECT * FROM my_table""")
| beam.Map(lambda row: "col1: %s, col2: %s" % (row.col1,row.col2))
| beam.Map(print))
输出为:
您可以打开 Apache Beam JIRA issue 请求您的用例是否可以在未来的版本中完成。
我是 运行 这个 beam tutorial 的简化版本,但是 运行 它使用我本地机器上的 DirectRunner。
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
import os
with beam.Pipeline() as p:
rows = (p |
beam.Create([
beam.Row(col1="val1", col2="col2_val1"),
beam.Row(col1="val2", col2="col2_val2"),
]
))
({"my_table": rows} | SqlTransform("""SELECT * FROM my_table"""))
如果我将 my_table
更改为 PCOLLECTION
它会起作用(尽管要真正起作用我需要传入 rows
而不是 dict.
我得到的错误信息:
Traceback (most recent call last):
File "./lib/scratch/test_join.py", line 12, in <module>
({"my_table": rows} | SqlTransform("""SELECT * FROM my_table"""))
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 606, in __ror__
result = p.apply(self, pvalueish, label)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/pipeline.py", line 694, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply
return m(transform, input, options)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform
return transform.expand(input)
File "/Users/steeling/src/versions/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 305, in expand
raise RuntimeError(response.error)
RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT * FROM my_table
at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:214)
at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:367)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:470)
at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:546)
at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:219)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListenerHalfClosed.runInContext(ServerImpl.java:797)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 22: Object 'my_table' not found
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:183)
... 17 more
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 22: Object 'my_table' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:172)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
... 18 more
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'my_table' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 37 more
您遇到此错误是因为您只有一个 PCollection。似乎 Apache Beam 将使用 PCOLLECTION
作为 table 源,即使您创建了一个具有 {"my_table": rows}
.
作为解决方法,如果您想在 SQL 语句中明确定义 table 名称,您可以创建一个包含虚拟值的临时 PCollection。创建一个字典,其中包含两个 PCollections 的键值对。
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform
import os
with beam.Pipeline() as p:
rows = (p |
"create rows" >> beam.Create([
beam.Row(col1="val1", col2="col2_val1"),
beam.Row(col1="val2", col2="col2_val2"),
]
))
rows_2 = (p |
"create rows_2" >> beam.Create([
beam.Row(col1_1="val1", col2_1="123"),
]
))
({"my_table": rows, "my_table2": rows_2} | SqlTransform("""SELECT * FROM my_table""")
| beam.Map(lambda row: "col1: %s, col2: %s" % (row.col1,row.col2))
| beam.Map(print))
输出为:
您可以打开 Apache Beam JIRA issue 请求您的用例是否可以在未来的版本中完成。