返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - Spring Batch批处理使用样例(读取文件数据并导入到数据库中)

作者:hangge | 2020-02-12 08:39

1,Spring Batch 介绍


2,样例说明

(1)本文通过一个简单的数据复制样例演示在 Spring Boot 中如何使用 Spring Batch。假设现在有一个 data.csv 文件,文件中保存了 4 条用户数据,我们需要通过批处理框架读取文件中的内容然后将之插入数据表中。
注意:为方便演示,本次样例只实现了 ItemReader(数据的读取)和 Itemwriter(数据的写出),中间的 ItemProcessor(数据的处理)不实现,即数据读取后不做任何处理,直接入库。

(2)data.csv 文件存放在 classpath 下(项目的 resources 文件夹里),里面内容如下(每行各个字段间使用空格隔开):
id username sex
1 张三 男
2 李四 男
3 王五 男
4 赵六 女

(3)要存入的数据表(user)的表结构如下: 

3,项目配置

(1)首先编辑项目的 pom.xml 文件,添加 spring-boot-starter-batch 依赖以及数据库相关依赖。
注意:添加数据库相关依赖是为了将批处理的执行状态持久化到数据库中(可以理解为执行时自动生成的一些日志记录)。
<!-- Spring Batch -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

<!-- spring-jdbc -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

<!-- 数据库驱动依赖 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- 数据库连接池 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.9</version>
</dependency>

(2)接着在 application.properties 中进行数据库基本信息配置:
# 前面4行是数据库基本配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/hangge?serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=hangge1234
# 项目启动时创建数据表(用于记录批处理执行状态)的 SQL 脚本,该脚本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
# 项目启动时执行建表 SQL
spring.batch.initialize-schema=always
# 默认情况下,项目启动时就会自动执行配置好的批处理操作。这里将其设为不自动执行,后面我们通过手动触发执行批处理
spring.batch.job.enabled=false

(3)最后在项目启动类上添加 @EnableBatchProcessing 注解开启 Spring Batch 支持:
@SpringBootApplication
@EnableBatchProcessing
public class DemoApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context
                = SpringApplication.run(DemoApplication.class, args);
    }
}

4,批处理配置

(1)创建 CsvBatchJobConfig 进行 Spring Batch 配置,代码如下:
(1)Spring Batch 提供了一些常用的 ItemReader(即数据的读取逻辑),例如:
  • JdbcPagingltemReader 用来读取数据库中的数据
  • StaxEventltemReader 用来读取 XML 数据
  • FlatFileltemReader 用来加载普通文件(本样例使用该 ItemReader
(2)Spring Batch 提供了一些常用的 ItemWriter(即数据的写出逻辑),例如:
  • FlatFileltemWriter 表示将数据写出为一个普通文件。
  • StaxEvenltemWriter 表示将数据写出为 XML
  • 还有针对不同数据库提供的写出操作支持类,如 MongoltemWriterJpaltemWriterNeo4jItemWriter 以及 HibernateltemWriter 等。
  • 本案例使用的 JdbcBatchltemWriter 则是通过 JDBC 将数据写出到一个关系型数据库中。
@Configuration
public class CsvBatchJobConfig {
    // 注入JobBuilderFactory,用来构建Job
    @Autowired
    JobBuilderFactory jobBuilderFactory;

    // 注入StepBuilderFactory,用来构建Step
    @Autowired
    StepBuilderFactory stepBuilderFactory;

    // 注入DataSource,用来支持持久化操作,这里持久化方案是Spring-Jdbc
    @Autowired
    DataSource dataSource;

    // 配置一个ItemReader,即数据的读取逻辑
    @Bean
    @StepScope
    FlatFileItemReader<User> itemReader() {
        // FlatFileItemReader 是一个加载普通文件的 ItemReader
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        // 由于data.csv文件第一行是标题,因此通过setLinesToSkip方法设置跳过一行
        reader.setLinesToSkip(1);
        // setResource方法配置data.csv文件的位置
        reader.setResource(new ClassPathResource("data.csv"));
        // 通过setLineMapper方法设置每一行的数据信息
        reader.setLineMapper(new DefaultLineMapper<User>(){{
            setLineTokenizer(new DelimitedLineTokenizer(){{
                // setNames方法配置了data.csv文件一共有4列,分别是id、username、以及sex,
                setNames("id","username","sex");
                // 配置列与列之间的间隔符(这里是空格)
                setDelimiter(" ");
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper(){{
                setTargetType(User.class);
            }});
        }});
        return reader;
    }

    // 配置ItemWriter,即数据的写出逻辑
    @Bean
    JdbcBatchItemWriter jdbcBatchItemWriter() {
        // 使用的JdbcBatchltemWriter则是通过JDBC将数据写出到一个关系型数据库中。
        JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
        // 配置使用的数据源
        writer.setDataSource(dataSource);
        // 配置数据插入SQL,注意占位符的写法是":属性名"
        writer.setSql("insert into user(id,username,sex) " +
                "values(:id,:username,:sex)");
        // 最后通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符一一映射
        writer.setItemSqlParameterSourceProvider(
                new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

    // 配置一个Step
    @Bean
    Step csvStep() {
        // Step通过stepBuilderFactory进行配置
        return stepBuilderFactory.get("csvStep") //通过get获取一个StepBuilder,参数数Step的name
                .<User, User>chunk(2) //方法的参数2,表示每读取到两条数据就执行一次write操作
                .reader(itemReader()) // 配置reader
                .writer(jdbcBatchItemWriter()) // 配置writer
                .build();
    }

    // 配置一个Job
    @Bean
    Job csvJob() {
        // 通过jobBuilderFactory构建一个Job,get方法参数为Job的name
        return jobBuilderFactory.get("csvJob")
                .start(csvStep()) // 配置该Job的Step
                .build();
    }
}

(2)这里还涉及到了一个 User 实体类,代码如下:
@NoArgsConstructor
@Setter
@Getter
public class User  {
    private Integer id;
    private String username;
    private String sex;
}

5,创建 Controller

接下来创建 Controller,当用户发起一个请求是触发批处理:
@RestController
public class HelloController {

    // JobLauncher 由框架提供
    @Autowired
    JobLauncher jobLauncher;

    // Job 为刚刚配置的
    @Autowired
    Job job;

    @GetMapping("/hello")
    public void hello() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .toJobParameters();
            // 通过调用 JobLauncher 中的 run 方法启动一个批处理
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6,运行测试

(1)启动项目,访问 http://localhost:8080/hello 地址触发批处理,可以发现 data.csv 中数据已经成功插入 user 表中。

(2)同时我们发现数据库中还会自动创建了多个批处理相关的表,这些表用来记录批处理的执行状态。 
注意:我们如果再次调用 Controller 接口触发批处理,数据也不会重复插入(即使 data.csv 里面内容改变了),因为这个批处理任务之前已经被记录为执行成功,所以不会再次执行。除非把 Job 名字改成其他的,比如“csvJob2” 。

附:增加 ItemProcessor 数据处理

    在实际应用中,我们读取完数据后可能还需要先对数据进行一些处理,然后再入库。下面演示如何实现这个功能。

1,自定义 ItemProcessor

    自定义处理器只需要实现 ItemProcessor 接口,重写 process 方法,输入的参数是从 ItemReader 读取到的数据,返回的数据给 ItemWriter。这里我们将读取到对性别“男/女”转换成英文“M/F
public class CvsItemProcessor implements ItemProcessor<User, User> {

    @Override
    public User process(User item) throws Exception {
        // 数据处理,比如将中文性别设置为M/F
        if ("男".equals(item.getSex())) {
            item.setSex("M");
        } else {
            item.setSex("F");
        }
        return item;
    }
}

2,配置 ItemProcessor

    修改批处理配置 CsvBatchJobConfig,增加 ItemProcessor 相关配置,ItemProcessor 具体实现则为上面我们自定义的 ItemProcessor
@Configuration
public class CsvBatchJobConfig {
    // 注入JobBuilderFactory,用来构建Job
    @Autowired
    JobBuilderFactory jobBuilderFactory;

    // 注入StepBuilderFactory,用来构建Step
    @Autowired
    StepBuilderFactory stepBuilderFactory;

    // 注入DataSource,用来支持持久化操作,这里持久化方案是Spring-Jdbc
    @Autowired
    DataSource dataSource;

    // 配置一个ItemReader,即数据的读取逻辑
    @Bean
    @StepScope
    FlatFileItemReader<User> itemReader() {
        // FlatFileItemReader 是一个加载普通文件的 ItemReader
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        // 由于data.csv文件第一行是标题,因此通过setLinesToSkip方法设置跳过一行
        reader.setLinesToSkip(1);
        // setResource方法配置data.csv文件的位置
        reader.setResource(new ClassPathResource("data.csv"));
        // 通过setLineMapper方法设置每一行的数据信息
        reader.setLineMapper(new DefaultLineMapper<User>(){{
            setLineTokenizer(new DelimitedLineTokenizer(){{
                // setNames方法配置了data.csv文件一共有4列,分别是id、username、以及sex,
                setNames("id","username","sex");
                // 配置列与列之间的间隔符(这里是空格)
                setDelimiter(" ");
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper(){{
                setTargetType(User.class);
            }});
        }});
        return reader;
    }

    // 配置ItemProcessor,进行数据处理
    @Bean
    ItemProcessor<User, User> itemProcessor(){
        // 使用自定义的数据处理类
        CvsItemProcessor cvsItemProcessor = new CvsItemProcessor();
        return cvsItemProcessor;
    }

    // 配置ItemWriter,即数据的写出逻辑
    @Bean
    JdbcBatchItemWriter jdbcBatchItemWriter() {
        // 使用的JdbcBatchltemWriter则是通过JDBC将数据写出到一个关系型数据库中。
        JdbcBatchItemWriter writer = new JdbcBatchItemWriter();
        // 配置使用的数据源
        writer.setDataSource(dataSource);
        // 配置数据插入SQL,注意占位符的写法是":属性名"
        writer.setSql("insert into user(id,username,sex) " +
                "values(:id,:username,:sex)");
        // 最后通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符一一映射
        writer.setItemSqlParameterSourceProvider(
                new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }

    // 配置一个Step
    @Bean
    Step csvStep() {
        // Step通过stepBuilderFactory进行配置
        return stepBuilderFactory.get("csvStep") //通过get获取一个StepBuilder,参数数Step的name
                .<User, User>chunk(2) //方法的参数2,表示每读取到两条数据就执行一次write操作
                .reader(itemReader()) // 配置reader
                .processor(itemProcessor()) // 配置processor
                .writer(jdbcBatchItemWriter()) // 配置writer
                .build();
    }

    // 配置一个Job
    @Bean
    Job csvJob() {
        // 通过jobBuilderFactory构建一个Job,get方法参数为Job的name
        return jobBuilderFactory.get("csvJob2")
                .start(csvStep()) // 配置该Job的Step
                .build();
    }
}

3,运行测试

启动项目,重新触发批处理,可以看到数据库数据中,性别部分变成了英文的“M/F
评论

全部评论(0)

回到顶部