用 java 在 apche_beam 中写入 tfrecords

Writing tfrecords in apche_beam with java

如何在java中编写以下代码?如果我在 java 中有 records/dicts 的列表,我如何编写光束代码以将它们写入 tf.train.Examples 被序列化的 tfrecords。 python 中有很多这样的例子,下面是 python 中的一个例子,我如何在 java 中编写相同的逻辑?

import tensorflow as tf
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
from apache_beam.coders import ProtoCoder

class Foo(beam.DoFn):
  def process(self, element, *args, **kwargs):
    import tensorflow as tf

    foo = element.get('foo')
    bar = element.get('bar')

    feature = {
      "foo":
        tf.train.Feature(bytes_list=tf.train.BytesList(value=[foo.encode('utf-8')])),
      "bar":
        tf.train.Feature(bytes_list=tf.train.BytesList(value=[bar.encode('utf-8')]))
    }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    yield example_proto

p = beam.Pipeline(runner=interactive_runner.InteractiveRunner())

records = p | "Create records" >> beam.Create([{'foo': 'abc', 'bar': 'pqr'} for _ in range(10)])
tf_examples = records | "Convert to tf examples" >> beam.ParDo(Foo())
tf_examples | "Dump Records" >> beam.io.WriteToTFRecord(file_path_prefix="./output/data-",
                                                    coder=ProtoCoder(tf.train.Example()),
                                                    file_name_suffix='.tfrecord', num_shards=2)

p.run()

我已经用 java 尝试过这个,但我仍然遇到一些问题,link 新问题在这里