使用 Apache Beam(GCP 数据流)写入 Kafka
Write To Kafka using Apache Beam (GCP Dataflow)
我正在尝试使用 WriteToKafka
通过 Apache Beam 将数据发送到 Python 中的 Kafka 主题,使用数据流作为运行器。
通过运行以下脚本:
with beam.Pipeline(options=beam_options) as p:
(p
| beam.Impulse()
| beam.Map(lambda input: (1, input))
| WriteToKafka(
producer_config={
'bootstrap.servers': 'ip:9092,',
},
topic='testclient',
key_serializer='org.apache.kafka.common.serialization.LongSerializer',
value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
)
)
我收到这个错误:
Traceback (most recent call last):
File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
run_pipeline(beam_options)
File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
(p
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
return Pipeline.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
return constructor(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
DoFnInfo.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
请注意,我已经通过 pip install 'apache-beam[gcp]'
.
安装了最新的 apache-beam 版本(GCP 支持)
- apache-beam==2.27.0
- google-cloud-core==1.5.0
如果我没记错的话,问题出在序列化方法上。我尝试了在 this 页面上找到的各种组合。
我错过了什么,我应该做些什么不同的事情?
解决方案是对键和值都使用显式类型转换。
| 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
.with_output_types(typing.Tuple[bytes, bytes])
| "Write to Kafka topic" >> WriteToKafka(
producer_config={'bootstrap.servers': consumer_servers},
topic='testclient')
)
我正在尝试使用 WriteToKafka
通过 Apache Beam 将数据发送到 Python 中的 Kafka 主题,使用数据流作为运行器。
通过运行以下脚本:
with beam.Pipeline(options=beam_options) as p:
(p
| beam.Impulse()
| beam.Map(lambda input: (1, input))
| WriteToKafka(
producer_config={
'bootstrap.servers': 'ip:9092,',
},
topic='testclient',
key_serializer='org.apache.kafka.common.serialization.LongSerializer',
value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
)
)
我收到这个错误:
Traceback (most recent call last):
File "/home/denes/data-science/try_write_to_kafka.py", line 75, in <module>
run_pipeline(beam_options)
File "/home/denes/data-science/try_write_to_kafka.py", line 38, in run_pipeline
(p
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 582, in __exit__
self.result = self.run()
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
return Pipeline.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
self._id_to_obj[id] = self._obj_type.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
return constructor(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
DoFnInfo.from_runner_api(
File "/home/denes/data-science/venv/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
请注意,我已经通过 pip install 'apache-beam[gcp]'
.
- apache-beam==2.27.0
- google-cloud-core==1.5.0
如果我没记错的话,问题出在序列化方法上。我尝试了在 this 页面上找到的各种组合。
我错过了什么,我应该做些什么不同的事情?
解决方案是对键和值都使用显式类型转换。
| 'Convert dict to byte string' >> beam.Map(lambda x: (b'', json.dumps(x).encode('utf-8')))
.with_output_types(typing.Tuple[bytes, bytes])
| "Write to Kafka topic" >> WriteToKafka(
producer_config={'bootstrap.servers': consumer_servers},
topic='testclient')
)