将 Flink 与 thrift 结合使用
Using Flink with thrift
我在我的 flink 应用程序中看到一些关于我的 thrift 的日志 类:
2020-06-01 14:31:28 INFO TypeExtractor:1885 - Class class com.test.TestStruct contains custom serialization methods we do not call, so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
所以我按照这里的说明操作:
我这样做是为了 TestStruct
的节俭以及其中的所有节俭结构。 (虽然我跳过了命名类型)。
此外,生成的节俭代码在 Java 中,而 flink 应用程序是使用 scala 编写的。
我怎样才能让那个错误消失?因为我遇到了另一个错误,如果我传递我的数据流以转换为 TestStruct
,一些字段会丢失。我怀疑这是由于序列化问题?
实际上,截至目前,您无法摆脱此警告,但由于以下原因也不是问题:
警告基本上只是说 Flink 的类型系统没有使用任何内部序列化器,而是将类型视为 "generic type",这意味着它是通过 Kryo 序列化的。如果您关注我的博客 post,这正是您想要的:使用 Kryo 通过 Thrift 进行序列化。您可以使用调试器在 TBaseSerializer
中设置断点以验证是否正在使用 Thrift。
至于缺少的字段,我怀疑这是在您的(平面)地图运算符中转换为您的 TestStruct
期间发生的,并且可能不在用于将此结构传递给下一个结构的序列化中操作员。您应该验证这些字段丢失的位置 - 如果您有这个可重现性,您最喜欢的调试器中的断点 IDE 应该可以帮助您找到原因。
我在我的 flink 应用程序中看到一些关于我的 thrift 的日志 类:
2020-06-01 14:31:28 INFO TypeExtractor:1885 - Class class com.test.TestStruct contains custom serialization methods we do not call, so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
所以我按照这里的说明操作:
我这样做是为了 TestStruct
的节俭以及其中的所有节俭结构。 (虽然我跳过了命名类型)。
此外,生成的节俭代码在 Java 中,而 flink 应用程序是使用 scala 编写的。
我怎样才能让那个错误消失?因为我遇到了另一个错误,如果我传递我的数据流以转换为 TestStruct
,一些字段会丢失。我怀疑这是由于序列化问题?
实际上,截至目前,您无法摆脱此警告,但由于以下原因也不是问题:
警告基本上只是说 Flink 的类型系统没有使用任何内部序列化器,而是将类型视为 "generic type",这意味着它是通过 Kryo 序列化的。如果您关注我的博客 post,这正是您想要的:使用 Kryo 通过 Thrift 进行序列化。您可以使用调试器在 TBaseSerializer
中设置断点以验证是否正在使用 Thrift。
至于缺少的字段,我怀疑这是在您的(平面)地图运算符中转换为您的 TestStruct
期间发生的,并且可能不在用于将此结构传递给下一个结构的序列化中操作员。您应该验证这些字段丢失的位置 - 如果您有这个可重现性,您最喜欢的调试器中的断点 IDE 应该可以帮助您找到原因。