SpringBoot - Spring Batch批处理使用样例(读取文件数据并导入到数据库中)
作者:hangge | 2020-02-12 08:39
1,Spring Batch 介绍
- Spring Batch 是一个开源的、全面的、轻量级的批处理框架,通过 Spring Batch 可以实现强大的批处理应用程序的开发。
- Spring Batch 还提供记录/跟踪、事务管理、作业处理统计、作业重启以及资源管理等功能。
- Spring Batch 结合定时任务可以发挥更大的作用。
- Spring Batch 提供了 ItemReader、ItemProcessor 和 ItemWriter 来完成数据的读取、处理以及写出操作,并且可以将批处理的执行状态持久化到数据库中。
2,样例说明
(1)本文通过一个简单的数据复制样例演示在 Spring Boot 中如何使用 Spring Batch。假设现在有一个 data.csv 文件,文件中保存了 4 条用户数据,我们需要通过批处理框架读取文件中的内容然后将之插入数据表中。
注意:为方便演示,本次样例只实现了 ItemReader(数据的读取)和 Itemwriter(数据的写出),中间的 ItemProcessor(数据的处理)不实现,即数据读取后不做任何处理,直接入库。
(2)data.csv 文件存放在 classpath 下(项目的 resources 文件夹里),里面内容如下(每行各个字段间使用空格隔开):
id username sex 1 张三 男 2 李四 男 3 王五 男 4 赵六 女
(3)要存入的数据表(user)的表结构如下:
3,项目配置
(1)首先编辑项目的 pom.xml 文件,添加 spring-boot-starter-batch 依赖以及数据库相关依赖。注意:添加数据库相关依赖是为了将批处理的执行状态持久化到数据库中(可以理解为执行时自动生成的一些日志记录)。
<!-- Spring Batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- spring-jdbc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- 数据库驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.9</version> </dependency>
(2)接着在 application.properties 中进行数据库基本信息配置:
# 前面4行是数据库基本配置
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql://localhost:3306/hangge?serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=hangge1234
# 项目启动时创建数据表(用于记录批处理执行状态)的 SQL 脚本,该脚本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
# 项目启动时执行建表 SQL
spring.batch.initialize-schema=always
# 默认情况下,项目启动时就会自动执行配置好的批处理操作。这里将其设为不自动执行,后面我们通过手动触发执行批处理
spring.batch.job.enabled=false
(3)最后在项目启动类上添加 @EnableBatchProcessing 注解开启 Spring Batch 支持:
@SpringBootApplication @EnableBatchProcessing public class DemoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args); } }
4,批处理配置
(1)Spring Batch 提供了一些常用的 ItemReader(即数据的读取逻辑),例如:
- JdbcPagingltemReader 用来读取数据库中的数据
- StaxEventltemReader 用来读取 XML 数据
- FlatFileltemReader 用来加载普通文件(本样例使用该 ItemReader)
- FlatFileltemWriter 表示将数据写出为一个普通文件。
- StaxEvenltemWriter 表示将数据写出为 XML。
- 还有针对不同数据库提供的写出操作支持类,如 MongoltemWriter、JpaltemWriter、Neo4jItemWriter 以及 HibernateltemWriter 等。
- 本案例使用的 JdbcBatchltemWriter 则是通过 JDBC 将数据写出到一个关系型数据库中。
@Configuration public class CsvBatchJobConfig { // 注入JobBuilderFactory,用来构建Job @Autowired JobBuilderFactory jobBuilderFactory; // 注入StepBuilderFactory,用来构建Step @Autowired StepBuilderFactory stepBuilderFactory; // 注入DataSource,用来支持持久化操作,这里持久化方案是Spring-Jdbc @Autowired DataSource dataSource; // 配置一个ItemReader,即数据的读取逻辑 @Bean @StepScope FlatFileItemReader<User> itemReader() { // FlatFileItemReader 是一个加载普通文件的 ItemReader FlatFileItemReader<User> reader = new FlatFileItemReader<>(); // 由于data.csv文件第一行是标题,因此通过setLinesToSkip方法设置跳过一行 reader.setLinesToSkip(1); // setResource方法配置data.csv文件的位置 reader.setResource(new ClassPathResource("data.csv")); // 通过setLineMapper方法设置每一行的数据信息 reader.setLineMapper(new DefaultLineMapper<User>(){{ setLineTokenizer(new DelimitedLineTokenizer(){{ // setNames方法配置了data.csv文件一共有4列,分别是id、username、以及sex, setNames("id","username","sex"); // 配置列与列之间的间隔符(这里是空格) setDelimiter(" "); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper(){{ setTargetType(User.class); }}); }}); return reader; } // 配置ItemWriter,即数据的写出逻辑 @Bean JdbcBatchItemWriter jdbcBatchItemWriter() { // 使用的JdbcBatchltemWriter则是通过JDBC将数据写出到一个关系型数据库中。 JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); // 配置使用的数据源 writer.setDataSource(dataSource); // 配置数据插入SQL,注意占位符的写法是":属性名" writer.setSql("insert into user(id,username,sex) " + "values(:id,:username,:sex)"); // 最后通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符一一映射 writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()); return writer; } // 配置一个Step @Bean Step csvStep() { // Step通过stepBuilderFactory进行配置 return stepBuilderFactory.get("csvStep") //通过get获取一个StepBuilder,参数数Step的name .<User, User>chunk(2) //方法的参数2,表示每读取到两条数据就执行一次write操作 .reader(itemReader()) // 配置reader .writer(jdbcBatchItemWriter()) // 配置writer .build(); } // 配置一个Job @Bean Job csvJob() { // 通过jobBuilderFactory构建一个Job,get方法参数为Job的name return jobBuilderFactory.get("csvJob") .start(csvStep()) // 配置该Job的Step .build(); } }
(2)这里还涉及到了一个 User 实体类,代码如下:
@NoArgsConstructor @Setter @Getter public class User { private Integer id; private String username; private String sex; }
5,创建 Controller
接下来创建 Controller,当用户发起一个请求是触发批处理:@RestController public class HelloController { // JobLauncher 由框架提供 @Autowired JobLauncher jobLauncher; // Job 为刚刚配置的 @Autowired Job job; @GetMapping("/hello") public void hello() { try { JobParameters jobParameters = new JobParametersBuilder() .toJobParameters(); // 通过调用 JobLauncher 中的 run 方法启动一个批处理 jobLauncher.run(job, jobParameters); } catch (Exception e) { e.printStackTrace(); } } }
6,运行测试
(1)启动项目,访问 http://localhost:8080/hello 地址触发批处理,可以发现 data.csv 中数据已经成功插入 user 表中。(2)同时我们发现数据库中还会自动创建了多个批处理相关的表,这些表用来记录批处理的执行状态。
注意:我们如果再次调用 Controller 接口触发批处理,数据也不会重复插入(即使 data.csv 里面内容改变了),因为这个批处理任务之前已经被记录为执行成功,所以不会再次执行。除非把 Job 名字改成其他的,比如“csvJob2” 。
附:增加 ItemProcessor 数据处理
在实际应用中,我们读取完数据后可能还需要先对数据进行一些处理,然后再入库。下面演示如何实现这个功能。1,自定义 ItemProcessor
自定义处理器只需要实现 ItemProcessor 接口,重写 process 方法,输入的参数是从 ItemReader 读取到的数据,返回的数据给 ItemWriter。这里我们将读取到对性别“男/女”转换成英文“M/F”
public class CvsItemProcessor implements ItemProcessor<User, User> { @Override public User process(User item) throws Exception { // 数据处理,比如将中文性别设置为M/F if ("男".equals(item.getSex())) { item.setSex("M"); } else { item.setSex("F"); } return item; } }
2,配置 ItemProcessor
修改批处理配置 CsvBatchJobConfig,增加 ItemProcessor 相关配置,ItemProcessor 具体实现则为上面我们自定义的 ItemProcessor。@Configuration public class CsvBatchJobConfig { // 注入JobBuilderFactory,用来构建Job @Autowired JobBuilderFactory jobBuilderFactory; // 注入StepBuilderFactory,用来构建Step @Autowired StepBuilderFactory stepBuilderFactory; // 注入DataSource,用来支持持久化操作,这里持久化方案是Spring-Jdbc @Autowired DataSource dataSource; // 配置一个ItemReader,即数据的读取逻辑 @Bean @StepScope FlatFileItemReader<User> itemReader() { // FlatFileItemReader 是一个加载普通文件的 ItemReader FlatFileItemReader<User> reader = new FlatFileItemReader<>(); // 由于data.csv文件第一行是标题,因此通过setLinesToSkip方法设置跳过一行 reader.setLinesToSkip(1); // setResource方法配置data.csv文件的位置 reader.setResource(new ClassPathResource("data.csv")); // 通过setLineMapper方法设置每一行的数据信息 reader.setLineMapper(new DefaultLineMapper<User>(){{ setLineTokenizer(new DelimitedLineTokenizer(){{ // setNames方法配置了data.csv文件一共有4列,分别是id、username、以及sex, setNames("id","username","sex"); // 配置列与列之间的间隔符(这里是空格) setDelimiter(" "); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper(){{ setTargetType(User.class); }}); }}); return reader; } // 配置ItemProcessor,进行数据处理 @Bean ItemProcessor<User, User> itemProcessor(){ // 使用自定义的数据处理类 CvsItemProcessor cvsItemProcessor = new CvsItemProcessor(); return cvsItemProcessor; } // 配置ItemWriter,即数据的写出逻辑 @Bean JdbcBatchItemWriter jdbcBatchItemWriter() { // 使用的JdbcBatchltemWriter则是通过JDBC将数据写出到一个关系型数据库中。 JdbcBatchItemWriter writer = new JdbcBatchItemWriter(); // 配置使用的数据源 writer.setDataSource(dataSource); // 配置数据插入SQL,注意占位符的写法是":属性名" writer.setSql("insert into user(id,username,sex) " + "values(:id,:username,:sex)"); // 最后通过BeanPropertyItemSqlParameterSourceProvider实例将实体类的属性和SQL中的占位符一一映射 writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()); return writer; } // 配置一个Step @Bean Step csvStep() { // Step通过stepBuilderFactory进行配置 return stepBuilderFactory.get("csvStep") //通过get获取一个StepBuilder,参数数Step的name .<User, User>chunk(2) //方法的参数2,表示每读取到两条数据就执行一次write操作 .reader(itemReader()) // 配置reader .processor(itemProcessor()) // 配置processor .writer(jdbcBatchItemWriter()) // 配置writer .build(); } // 配置一个Job @Bean Job csvJob() { // 通过jobBuilderFactory构建一个Job,get方法参数为Job的name return jobBuilderFactory.get("csvJob2") .start(csvStep()) // 配置该Job的Step .build(); } }
全部评论(0)