Spring 批次 4.2.4:无法反序列化执行上下文

Spring Batch 4.2.4: Unable to deserialize the execution context

我使用 spring-batch:4.2.2.RELEASE 作为 spring-boot-starter-batch:2.2.4.RELEASE 的一部分。 将后者升级到2.3.1.RELEASE版本后,开始作业时出现如下异常:

java.lang.IllegalArgumentException: Unable to deserialize the execution context
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao$ExecutionContextRowMapper.mapRow(JdbcExecutionContextDao.java:328)
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao$ExecutionContextRowMapper.mapRow(JdbcExecutionContextDao.java:312)
    at org.springframework.jdbc.core.RowMapperResultSetExtractor.extractData(RowMapperResultSetExtractor.java:94)
    at org.springframework.jdbc.core.RowMapperResultSetExtractor.extractData(RowMapperResultSetExtractor.java:61)
    at org.springframework.jdbc.core.JdbcTemplate.doInPreparedStatement(JdbcTemplate.java:679)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:617)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:669)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:700)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:712)
    at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:768)
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao.getExecutionContext(JdbcExecutionContextDao.java:129)
    at org.springframework.batch.core.explore.support.SimpleJobExplorer.getStepExecutionDependencies(SimpleJobExplorer.java:238)
    at org.springframework.batch.core.explore.support.SimpleJobExplorer.getJobExecutions(SimpleJobExplorer.java:87)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy145.getJobExecutions(Unknown Source)
...
Caused by: com.fasterxml.jackson.databind.exc.InvalidTypeIdException: Missing type id when trying to resolve subtype of [map type; class java.util.HashMap, [simple type, class java.lang.String] -> [simple type, class java.lang.Object]]: missing type id property '@class'
 at [Source: (ByteArrayInputStream); line: 1, column: 192]
    at com.fasterxml.jackson.databind.exc.InvalidTypeIdException.from(InvalidTypeIdException.java:43)
    at com.fasterxml.jackson.databind.DeserializationContext.missingTypeIdException(DeserializationContext.java:1790)
    at com.fasterxml.jackson.databind.DeserializationContext.handleMissingTypeId(DeserializationContext.java:1319)
    at com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._handleMissingTypeId(TypeDeserializerBase.java:303)
    at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedUsingDefaultImpl(AsPropertyTypeDeserializer.java:166)
    at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserializeWithType(MapDeserializer.java:400)
    at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:68)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4482)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3479)
    at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.deserialize(Jackson2ExecutionContextStringSerializer.java:123)
    at org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer.deserialize(Jackson2ExecutionContextStringSerializer.java:102)
    at org.springframework.batch.core.repository.dao.JdbcExecutionContextDao$ExecutionContextRowMapper.mapRow(JdbcExecutionContextDao.java:325)
    ... 45 common frames omitted


我了解新版本对 JSON 反序列化的处理有限制,并尝试实施 Jackson2ExecutionContextStringSerializer javadoc 中建议的修复,但问题仍然存在:

@EnableBatchProcessing
@Configuration
class BatchConfig(

    val properties: BatchProperties,
    val dataSource: DataSource,
    val transactionManagerCustomizers: TransactionManagerCustomizers,
    val entityManagerFactory: EntityManagerFactory
) : JpaBatchConfigurer(properties, dataSource, transactionManagerCustomizers, entityManagerFactory) {

    override fun createJobRepository(): JobRepository {
        val factory = JobRepositoryFactoryBean()
        val map = PropertyMapper.get()
        map.from(dataSource).to { dataSource: DataSource? -> factory.setDataSource(dataSource!!) }
        map.from { determineIsolationLevel() }.whenNonNull().to { isolationLevelForCreate: String? -> factory.setIsolationLevelForCreate(isolationLevelForCreate!!) }
        map.from { properties.tablePrefix }.whenHasText().to { tablePrefix: String? -> factory.setTablePrefix(tablePrefix!!) }
        map.from { transactionManager }.to { transactionManager: PlatformTransactionManager? -> factory.transactionManager = transactionManager!! }
        factory.afterPropertiesSet()

        val serializer = configureContextSerializer()
        factory.setSerializer(serializer)

        return factory.getObject()
    }

    private fun configureContextSerializer(): Jackson2ExecutionContextStringSerializer {
        val polymorphicTypeValidator = LaissezFaireSubTypeValidator()
        objectMapper.activateDefaultTyping(polymorphicTypeValidator)
        val serializer = Jackson2ExecutionContextStringSerializer()
        serializer.setObjectMapper(objectMapper)
        return serializer
    }

最疯狂的是执行上下文实际上是空的,数据库值总是"{}"。我什至尝试将数据库中的所有值更改为 "{"@class":"java.util.HashMap"}",但我仍然遇到相同的异常。

有人知道如何解决这个问题吗?我的修复尝试中的配置是否错误?

感谢@MahmoudBenHassine 为我指明了修复方向:

我手动将类型信息添加到数据库值的尝试是正确的,但我做的还不够。

有 2 个 table,其值需要更新:

  • tablebatch_job_execution_context,列short_context
  • tablebatch_step_execution_context,列short_context

我用 liquibase 脚本做了这个:

    <changeSet id="update-job_execution_context-for-spring-batch-4.2.4" author="kpentchev">
        <update tableName="batch_step_execution_context">
            <column name="short_context" valueComputed="REPLACE(short_context, '{', '{&quot;@class&quot;:&quot;java.util.HashMap&quot;,')" />
        </update>
    </changeSet>

    <changeSet id="update-step_execution_context-for-spring-batch-4.2.4" author="kpentchev">
        <update tableName="batch_step_execution_context">
            <column name="short_context" valueComputed="REPLACE(short_context, '{', '{&quot;@class&quot;:&quot;java.util.HashMap&quot;,')" />
        </update>
    </changeSet>

不需要覆盖配置。

将迁移服务作为 spring-batch 版本的一部分会更好,但这里有一个解决方法。

我最近 运行 在从 Spring 批量 4.2.1.RELEASE 升级到 4.2.4.RELEASE 时遇到了类似的问题。

@kpentchev为此提供了一个很好的解决方案,直接修改数据库中的序列化执行上下文JSON。

另一种解决方案是扩展 Jackson2ExecutionContextStringSerializer#deserialize(InputStream),捕获反序列化旧 JSON 格式时抛出的潜在异常,并使用第二个遗留 ObjectMapper.

我在下面提供了一个这样的实现。


import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import javax.validation.constraints.NotNull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer;
import org.springframework.util.ReflectionUtils;

/**
 * Extends {@link Jackson2ExecutionContextStringSerializer} in order to support deserializing JSON
 * that was serialized using Spring Batch 4.2.1.RELEASE, and persisted in the database.
 *
 * <p>This class has been tested upgrading from Spring Batch 4.2.1.RELEASE to 4.2.4.RELEASE.
 */
@Slf4j
public class BackwardsCompatibleSerializer extends Jackson2ExecutionContextStringSerializer {

  private final ObjectMapper newObjectMapper;

  private final ObjectMapper legacyObjectMapper;

  public BackwardsCompatibleSerializer() {
    newObjectMapper = getNewObjectMapper();
    legacyObjectMapper = createLegacyObjectMapper();
  }

  /**
   * Overrides the default deserialization method.  If an {@link InvalidTypeIdException} is thrown
   * during deserialization, the exception is caught, and an attempt is made to deserialize the JSON
   * using the legacy {@link ObjectMapper} instance.
   */
  @Override
  public @NotNull Map<String, Object> deserialize(@NotNull InputStream in) throws IOException {
    String json = inputStreamToString(in);
    TypeReference<HashMap<String, Object>> typeRef = new TypeReference<>() {};
    try {
      return newObjectMapper.readValue(json, typeRef);
    } catch (InvalidTypeIdException e) {
      log.info("Couldn't deserialize JSON: will attempt to use legacy ObjectMapper");
      log.debug("Stacktrace", e);
      return legacyObjectMapper.readValue(json, typeRef);
    }
  }

  /**
   * Uses Java reflection to access the new {@link ObjectMapper} instance from the private
   * superclass field.  This will be used to serialize and deserialize JSON created using Spring
   * Batch 4.2.4.RELEASE.
   *
   * @return the new {@link ObjectMapper} instance
   */
  private ObjectMapper getNewObjectMapper() {
    ObjectMapper newObjectMapper;
    Field field = ReflectionUtils.findField(Jackson2ExecutionContextStringSerializer.class,
        "objectMapper", ObjectMapper.class);
    Objects.requireNonNull(field, "objectMapper field is null");
    ReflectionUtils.makeAccessible(field);
    newObjectMapper = (ObjectMapper) ReflectionUtils.getField(field, this);
    return newObjectMapper;
  }

  /**
   * Creates the {@link ObjectMapper} instance that can be used for deserializing JSON that was
   * previously serialized using Spring Batch 4.2.1.RELEASE.  This instance is only used if an
   * exception is thrown in {@link #deserialize(InputStream)} when using the new {@link
   * ObjectMapper} instance.
   *
   * @return the {@link ObjectMapper} instance that can be used for deserializing legacy JSON
   */
  @SuppressWarnings("deprecation")
  private ObjectMapper createLegacyObjectMapper() {
    ObjectMapper legacyObjectMapper = new ObjectMapper();
    legacyObjectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
    legacyObjectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
    legacyObjectMapper.enableDefaultTyping();
    legacyObjectMapper.registerModule(new JobParametersModule());
    return legacyObjectMapper;
  }

  private static String inputStreamToString(@NonNull InputStream inputStream) throws IOException {
    ByteArrayOutputStream result = new ByteArrayOutputStream();
    byte[] buffer = new byte[1024];
    int length;
    while ((length = inputStream.read(buffer)) != -1) {
      result.write(buffer, 0, length);
    }
    return result.toString(StandardCharsets.UTF_8);
  }

  /*
   * The remainder of this file was copied from here:
   *
   * https://github.com/spring-projects/spring-batch/blob/4.2.1.RELEASE/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/Jackson2ExecutionContextStringSerializer.java
   */

  // BATCH-2680

  /**
   * Custom Jackson module to support {@link JobParameter} and {@link JobParameters}
   * deserialization.
   */
  private static class JobParametersModule extends SimpleModule {

    private static final long serialVersionUID = 1L;

    private JobParametersModule() {
      super("Job parameters module");
      setMixInAnnotation(JobParameters.class, JobParametersMixIn.class);
      addDeserializer(JobParameter.class, new JobParameterDeserializer());
    }

    private abstract static class JobParametersMixIn {

      @JsonIgnore
      abstract boolean isEmpty();
    }

    private static class JobParameterDeserializer extends StdDeserializer<JobParameter> {

      private static final long serialVersionUID = 1L;
      private static final String IDENTIFYING_KEY_NAME = "identifying";
      private static final String TYPE_KEY_NAME = "type";
      private static final String VALUE_KEY_NAME = "value";

      JobParameterDeserializer() {
        super(JobParameter.class);
      }

      @SuppressWarnings("checkstyle:all")
      @Override
      public JobParameter deserialize(JsonParser parser, DeserializationContext context)
          throws IOException {
        JsonNode node = parser.readValueAsTree();
        boolean identifying = node.get(IDENTIFYING_KEY_NAME).asBoolean();
        String type = node.get(TYPE_KEY_NAME).asText();
        JsonNode value = node.get(VALUE_KEY_NAME);
        Object parameterValue;
        switch (JobParameter.ParameterType.valueOf(type)) {
          case STRING: {
            parameterValue = value.asText();
            return new JobParameter((String) parameterValue, identifying);
          }
          case DATE: {
            parameterValue = new Date(value.get(1).asLong());
            return new JobParameter((Date) parameterValue, identifying);
          }
          case LONG: {
            parameterValue = value.get(1).asLong();
            return new JobParameter((Long) parameterValue, identifying);
          }
          case DOUBLE: {
            parameterValue = value.asDouble();
            return new JobParameter((Double) parameterValue, identifying);
          }
        }
        return null;
      }
    }
  }
}

我依赖于@kpentchev 的解决方案并使用了以下 SQL 命令:

update BATCH_JOB_EXECUTION_CONTEXT
set SHORT_CONTEXT = replace(SHORT_CONTEXT, '{"map"', '{"@class":"java.util.HashMap","map"')
WHERE SHORT_CONTEXT LIKE '{"map":%';

update BATCH_STEP_EXECUTION_CONTEXT
set SHORT_CONTEXT = replace(SHORT_CONTEXT, '{"map"','{"@class":"java.util.HashMap","map"')
WHERE SHORT_CONTEXT LIKE '{"map":%';

commit;