如何反序列化通过 debezium CDC 机制从 kafka broker 接收到的 BigDecimal 值?
How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?
我有几个使用 spring 引导开发的微服务,每个微服务都有自己的 Postgres 数据库。这些微服务通过 kafka broker 和 kafka connect 与 debezium 平台提供的 CDC 机制交换数据。我有一个微服务 A,它存储一些具有 BigDecimal 属性的实体。另一个微服务 B 依赖于 A 存储的数据,因此它通过 kafka 主题将其作为消息获取,如下所示:
"after":{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}
系数属性是一个BigDecimal,它作为BigDecimal (4.00)存储在微服务A数据库中。
4.00怎么变成了"AZA="? "AZA=" 是某种编码格式来保持 BigDecimal 精度吗?如何从 "AZA=" 再次升级到 4.0?
请注意,jackson 无法将 "AZA=" 的字符串值反序列化为 BigDecimal 值,但例外情况是:
com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `java.math.BigDecimal` from String "AZA=": not a valid representation
at [Source: (String)"{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}"; line: 1, column: 60] (through reference chain: org.perfometer.performanceservice.entities.ActorTypeEntity["coefficient"])
at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1549)
at com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:911)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:922)
at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl.consumeActorTypeMessages(ActorTypeServiceImpl.java:123)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$FastClassBySpringCGLIB$4d568c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$EnhancerBySpringCGLIB$7173df.consumeActorTypeMessages(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
任何提示或任何帮助将不胜感激!谢谢!
这是 Java - https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
的解决方案
另请检查 decimal.handling.mode
选项以了解如何将 BigDecimal
编码到消息中的其他方式。
如果您使用的是 Java 8,您可以将 Base64 字符串转换为字节数组,然后使用一些黑魔法获得 bigDecimal:
BigDecimal bigDecimal = new BigDecimal(new BigInteger(Base64.getDecoder().decode("BfXhAA==")), scale);
https://github.com/confluentinc/kafka-connect-storage-cloud/issues/48#issuecomment-395206864
如果您将 Flink 1.13 Table API/SQL 与默认为 decimal.handling.mode
的 Debezium 一起使用,那么它将为 DECIMAL
字段抛出 java.lang.NumberFormatException
。
可以设置decimal.handling.mode = string
,在Flink中处理
原因:
- 如果您将字段声明为
DECIMAL
,它将通过 createDecimalConverter
- 这使用
FasterXML
的 JsonNode.isBigDecimal()
,它总是 returns False
- 这将导致
bigDecimal = new BigDecimal(jsonNode.asText())
需要 String
表示十进制值,例如"100.123"
- 但是该值是 base64 编码的,例如
"WRy5X4A="
- 抛出错误
我有几个使用 spring 引导开发的微服务,每个微服务都有自己的 Postgres 数据库。这些微服务通过 kafka broker 和 kafka connect 与 debezium 平台提供的 CDC 机制交换数据。我有一个微服务 A,它存储一些具有 BigDecimal 属性的实体。另一个微服务 B 依赖于 A 存储的数据,因此它通过 kafka 主题将其作为消息获取,如下所示:
"after":{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}
系数属性是一个BigDecimal,它作为BigDecimal (4.00)存储在微服务A数据库中。
4.00怎么变成了"AZA="? "AZA=" 是某种编码格式来保持 BigDecimal 精度吗?如何从 "AZA=" 再次升级到 4.0?
请注意,jackson 无法将 "AZA=" 的字符串值反序列化为 BigDecimal 值,但例外情况是:
com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `java.math.BigDecimal` from String "AZA=": not a valid representation
at [Source: (String)"{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}"; line: 1, column: 60] (through reference chain: org.perfometer.performanceservice.entities.ActorTypeEntity["coefficient"])
at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1549)
at com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:911)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:922)
at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl.consumeActorTypeMessages(ActorTypeServiceImpl.java:123)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$FastClassBySpringCGLIB$4d568c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$EnhancerBySpringCGLIB$7173df.consumeActorTypeMessages(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
任何提示或任何帮助将不胜感激!谢谢!
这是 Java - https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
的解决方案另请检查 decimal.handling.mode
选项以了解如何将 BigDecimal
编码到消息中的其他方式。
如果您使用的是 Java 8,您可以将 Base64 字符串转换为字节数组,然后使用一些黑魔法获得 bigDecimal:
BigDecimal bigDecimal = new BigDecimal(new BigInteger(Base64.getDecoder().decode("BfXhAA==")), scale);
https://github.com/confluentinc/kafka-connect-storage-cloud/issues/48#issuecomment-395206864
如果您将 Flink 1.13 Table API/SQL 与默认为 decimal.handling.mode
的 Debezium 一起使用,那么它将为 DECIMAL
字段抛出 java.lang.NumberFormatException
。
可以设置decimal.handling.mode = string
,在Flink中处理
原因:
- 如果您将字段声明为
DECIMAL
,它将通过 createDecimalConverter - 这使用
FasterXML
的JsonNode.isBigDecimal()
,它总是 returnsFalse
- 这将导致
bigDecimal = new BigDecimal(jsonNode.asText())
需要String
表示十进制值,例如"100.123"
- 但是该值是 base64 编码的,例如
"WRy5X4A="
- 抛出错误