SpringBoot - InfluxDB数据库的集成与使用详解
作者:hangge | 2020-10-09 08:10
InfluxDB 是一个开源分布式时序、时间和指标数据库,使用 Go 语言编写,无需外部依赖。该数据库现在主要用于存储涉及大量的时间戳数据,如 DevOps 监控数据、APP metrics、IoT 传感器数据和实时分析数据。关于 InfluxDB 更详细的介绍可以查看我之前写的文章(点击查看)。本文演示如何在 Spring Boot 中操作 InfluxDB 数据库。
一、准备工作
1,环境准备
关于 InfluxDB 的安装,可以参考我之前写的文章:
2,项目配置
(1)首先编辑项目的 pom.xml 文件,添加 hbase-client 依赖:
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.15</version> </dependency>
(2)然后在 application.properties 中添加 InfluxDB 相关配置:
spring.influx.url=http://192.168.60.133:8086 #spring.influx.user= #spring.influx.password= spring.influx.database=mydb
(3)最后我们要创建一个配置类 InfluxDbConfig 来读取配置文件,并生成 InfluxDB 实例:
@Configuration public class InfluxDbConfig { @Value("${spring.influx.url:''}") private String influxDBUrl; @Value("${spring.influx.user:''}") private String userName; @Value("${spring.influx.password:''}") private String password; @Value("${spring.influx.database:''}") private String database; // 数据保存策略 public static String retentionPolicy = "autogen"; @Bean public InfluxDB influxDB(){ InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password); try { /** * 异步插入: * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒 * point的个数和时间是联合使用的,如果满100条或者1000毫秒 * 满足任何一个条件就会发送一次写的请求。 * 如果不需要异步插入则将后面 .enableBatch 部分注释掉即可 */ influxDB.setDatabase(database) .enableBatch(100,1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } finally { influxDB.setRetentionPolicy(retentionPolicy); } influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } }
二、基本用法
1,插入数据
(1)我们创建一个 Controller 通过调用 InfluxDB 对象来插入两条数据:提示:数据库和对应的表可以不需要提前创建好,插入时会自动创建。
@RestController public class HelloController { @Autowired private InfluxDB influxDB; @Value("${spring.influx.database}") private String database; @GetMapping("/test1") public void test1() { System.out.println("---开始插入数据---"); influxDB.write(Point.measurement("temperature") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("machine", "unit42") .tag("type", "assembly") .addField("external", 11) .addField("internal", 22) .build()); influxDB.write(Point.measurement("temperature") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("machine", "unit43") .tag("type", "assembly") .addField("external", 33) .addField("internal", 44) .build()); } }
(2)使用浏览器访问 /test1 接口,可以看到控制台输出如下:
(3)查询下数据库可以看到数据已成功插入:
2,查询数据
(1)我们对上面的 Controller 稍作修改,增加一个查询的测试方法:
@RestController public class HelloController { @Autowired private InfluxDB influxDB; @Value("${spring.influx.database}") private String database; @GetMapping("/test2") public void test2() { System.out.println("---开始查询数据---"); // InfluxDB支持分页查询,因此可以设置分页查询条件 int page = 1; // 起始页(从0开始) int pageSize = 10; // 每页条目数 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查询条件暂且为空 // 此处查询所有内容,如果 String queryCmd = "SELECT * FROM " // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加; // + 策略name + "." + measurement + "temperature" // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度) + queryCondition // 查询结果需要按照时间排序 + " ORDER BY time DESC" // 添加分页查询条件 + pageQuery; // 开始查询 QueryResult queryResult = influxDB.query(new Query(queryCmd, database)); System.out.println("原始结果为:" + queryResult); // 获取查询结果 List<QueryResult.Result> results = queryResult.getResults(); if (results == null) { return; } // 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行 QueryResult.Result result = results.get(0); List<QueryResult.Series> seriesList = result.getSeries(); for (QueryResult.Series series : seriesList) { if (series == null) { return; } System.out.println("结果数量为:" + (series.getValues() == null ? 0 : series.getValues().size())); System.out.println("colums ==>> " + series.getColumns()); System.out.println("tags ==>> " + series.getTags()); System.out.println("name ==>> " + series.getName()); System.out.println("values ==>> " + series.getValues()); } } }
(2)使用浏览器访问 /test2 接口,可以看到控制台输出如下:
三、通过实体类进行数据插入
1,构建实体类
首先我们创建 temperature 这个表对应的实体类,代码如下:
@Builder @Data @AllArgsConstructor @NoArgsConstructor @Measurement(name = "temperature") public class Temperature { // Column中的name为measurement中的列名 // 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换 @Column(name = "time") private Long time; // 注解中添加tag = true,表示当前字段内容为tag内容 @Column(name = "machine", tag = true) private String machine; @Column(name = "type", tag = true) private String type; @Column(name = "external") private Float external; @Column(name = "internal") private Float internal; }
2,插入数据
我们对上面的 Controller 稍作修改,这次插入是是通过实体类对象进行插入:
@RestController public class HelloController { @Autowired private InfluxDB influxDB; @GetMapping("/test1") public void test1() { System.out.println("---开始插入数据---"); Temperature temperature1 = new Temperature(System.currentTimeMillis(), "unit42", "assembly", 11F ,22F); influxDB.write(Point.measurementByPOJO(Temperature.class) .time(temperature1.getTime(), TimeUnit.MILLISECONDS) .addFieldsFromPOJO(temperature1) .build()); Temperature temperature2 = new Temperature(System.currentTimeMillis(), "unit43", "assembly", 33F ,44F); influxDB.write(Point.measurementByPOJO(Temperature.class) .time(temperature1.getTime(), TimeUnit.MILLISECONDS) .addFieldsFromPOJO(temperature2) .build()); } }
3,查询数据
目前无法自动将查询结果转成我们自定义的实体类对象,只能手动进行创建并赋值:@RestController public class HelloController { @Autowired private InfluxDB influxDB; @Value("${spring.influx.database}") private String database; @GetMapping("/test2") public List<Temperature> test2() { System.out.println("---开始查询数据---"); // InfluxDB支持分页查询,因此可以设置分页查询条件 int page = 1; // 起始页(从0开始) int pageSize = 10; // 每页条目数 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查询条件暂且为空 // 此处查询所有内容,如果 String queryCmd = "SELECT time, machine, type, external, internal FROM " // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加; // + 策略name + "." + measurement + "temperature" // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度) + queryCondition // 查询结果需要按照时间排序 + " ORDER BY time DESC" // 添加分页查询条件 + pageQuery; // 开始查询 QueryResult queryResult = influxDB.query(new Query(queryCmd, database), TimeUnit.MILLISECONDS); System.out.println("原始结果为:" + queryResult); // 获取查询结果 List<QueryResult.Result> results = queryResult.getResults(); if (results == null) { return null; } // 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行 QueryResult.Result result = results.get(0); List<QueryResult.Series> seriesList = result.getSeries(); List<Temperature> temperatureList = new LinkedList<>(); for (QueryResult.Series series : seriesList) { if (series == null) { return null; } System.out.println("结果数量为:" + (series.getValues() == null ? 0 : series.getValues().size())); System.out.println("colums ==>> " + series.getColumns()); System.out.println("tags ==>> " + series.getTags()); System.out.println("name ==>> " + series.getName()); System.out.println("values ==>> " + series.getValues()); series.getValues().forEach(valueData -> { Temperature temperature = new Temperature(); // 直接查询出来的是科学计数法,需要转换为Long类型的数据 BigDecimal decimalTime = new BigDecimal(valueData.get(0).toString()); temperature.setTime(decimalTime.longValue()); temperature.setMachine(valueData.get(1).toString()); temperature.setType(valueData.get(2).toString()); temperature.setExternal(Float.valueOf(valueData.get(3).toString())); temperature.setInternal(Float.valueOf(valueData.get(4).toString())); temperatureList.add(temperature); }); } return temperatureList; } }
全部评论(0)