Spring 批量读取 key= value 并将其加载到 DB

Spring batch to read a key= value and load it into DB

我是 spring 批处理的新手,我有一个包含 .txt 格式的关键参数值的提要文件。我需要使用 spring 批处理将文件加载到 Mysql 数据库中。有没有办法读取带有键值消息的文本文件。两行之间用空行隔开,分隔符为'='.

示例文件:

Name=Jack
Id=ADC12345
ClassId=7018
Rank=-326

Name=Gile
Id=FED12345
ClassId=7018
Rank=-32

Name、ID、ClassId 和 Rank 是列值。

此输入格式有 2 个挑战

  1. start/end 一个完整的项目
  2. 将项目分成 key/value 对

一个解决方案可能是使用自定义 RecordSeparatorPolicy 和自定义 LineMapper,例如

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.core.io.ClassPathResource;
import org.springframework.validation.BindException;

public class ReaderKeyValueTest {
    @Test
    public void test() throws Exception {
        FlatFileItemReader<Map<String, String>> reader = new FlatFileItemReader<Map<String, String>>();
        reader.setResource(new ClassPathResource("keyvalue.txt"));
        // custom RecordSeparatorPolicy
        reader.setRecordSeparatorPolicy(new RecordSeparatorPolicy() {

            @Override
            public String preProcess(final String record) {
                // empty line is added to the previous 'item'
                if (record.isEmpty()) {
                    return record;
                } else {
                    // line with content means it is part of an 'item', lets enhance it with adding a separator
                    return record + ",";
                }
            }

            @Override
            public String postProcess(final String record) {
                return record;
            }

            @Override
            public boolean isEndOfRecord(final String record) {
                // the end of a record is marked with the last key/value pair for "Rank"
                if (record.contains("Rank=")) {
                    return true;
                } else {
                    return false;
                }
            }
        });
        DefaultLineMapper<Map<String, String>> lineMapper = new DefaultLineMapper<Map<String, String>>();
        // the key/value pairs are separated with ',', so we can use the standard DelimitedLineTokenizer here
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
        lineMapper.setFieldSetMapper(new FieldSetMapper<Map<String, String>>() {

            @Override
            public Map<String, String> mapFieldSet(final FieldSet fieldSet) throws BindException {
                Map<String, String> item = new HashMap<String, String>();
                // split each "Key=Value" and add to the Map
                for (int i = 0; i < fieldSet.getValues().length; i++) {
                    String[] entry = fieldSet.getValues()[i].split("=");
                    item.put(entry[0], entry[1]);
                }
                return item;
            }
        });
        reader.setLineMapper(lineMapper);
        reader.open(new ExecutionContext());
        Map<String, String> item;
        while ((item = reader.read()) != null) {
            System.out.println(item.toString());
        }
        reader.read();
        reader.close();
    }
}

sysout 产生

{ClassId=7018, Id=ADC12345, Name=Jack, Rank=-326}
{ClassId=7018, Id=FED12345, Name=Gile, Rank=-32}

这是一个可行的解决方案(您只需要在最后一条记录之后有一个空行,否则将不会被读取):

1) 声明您的业务对象:

public class Student {

    private String name;
    private String id;
    private Integer classId;
    private Integer rank;

    // Getter + Setters

}

2) 声明一个自定义项目流reader,您将向其委托实际的 FlatFileItemReader :

public class CustomMultiLineItemReader implements ItemStreamReader<Student> {

    private FlatFileItemReader<FieldSet> delegate;

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        delegate.open(executionContext);
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        delegate.update(executionContext);
    }

    @Override
    public void close() throws ItemStreamException {
       delegate.close();
    }

    // Getter + Setters
}

3) 覆盖其读取方法以手动映射您的多行记录:

public Student read() throws Exception {
    Student s = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {

        if (line.getFieldCount() == 0) {
            return s; // Record must end with footer
        } else {

            String prefix = line.readString(0);
            if (prefix.equals("Name")) {
                s = new Student(); // Record must start with header
                s.setName(line.readString(1));
            }
            else if (prefix.equals("Id")) {
               s.setId(line.readString(1));
            }
            else if (prefix.equals("ClassId")) {
                s.setClassId(line.readInt(1));
            }
            else if (prefix.equals("Rank")) {
                s.setRank(line.readInt(1));
            }
        }
    }
   return null;
}

4) 在步骤中声明reader并配置它:

<bean class="xx.xx.xx.CustomMultiLineItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="file:${YOUR_FILE}"></property>
            <property name="linesToSkip" value="0"></property>
            <property name="lineMapper">
                <bean class="org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper">
                    <property name="tokenizers">
                        <map>
                            <entry key="*">
                                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                                    <property name="delimiter" value="="></property>
                                </bean>
                            </entry>                                                   
                        </map>
                    </property>
                    <property name="fieldSetMappers">
                        <map>
                            <entry key="*">
                                 <bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />
                            </entry>
                        </map>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

我使用 PatternMatchingCompositeLineMapper 将行内容(此处:*)与相应的 lineTokenizer 和 lineMapper 相关联(尽管在这种情况下它没有用)。

然后,PassThroughFieldSetMapper 让 reader 进行映射,DelimitedLineTokenizer 在“=”字符上拆分行。