SpringBoot - InfluxDB数据库的集成与使用详解
作者:hangge | 2020-10-09 08:10
InfluxDB 是一个开源分布式时序、时间和指标数据库,使用 Go 语言编写,无需外部依赖。该数据库现在主要用于存储涉及大量的时间戳数据,如 DevOps 监控数据、APP metrics、IoT 传感器数据和实时分析数据。关于 InfluxDB 更详细的介绍可以查看我之前写的文章(点击查看)。本文演示如何在 Spring Boot 中操作 InfluxDB 数据库。
一、准备工作
1,环境准备
关于 InfluxDB 的安装,可以参考我之前写的文章:
2,项目配置
(1)首先编辑项目的 pom.xml 文件,添加 hbase-client 依赖:
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>
(2)然后在 application.properties 中添加 InfluxDB 相关配置:
spring.influx.url=http://192.168.60.133:8086 #spring.influx.user= #spring.influx.password= spring.influx.database=mydb
(3)最后我们要创建一个配置类 InfluxDbConfig 来读取配置文件,并生成 InfluxDB 实例:
@Configuration
public class InfluxDbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.user:''}")
private String userName;
@Value("${spring.influx.password:''}")
private String password;
@Value("${spring.influx.database:''}")
private String database;
// 数据保存策略
public static String retentionPolicy = "autogen";
@Bean
public InfluxDB influxDB(){
InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
try {
/**
* 异步插入:
* enableBatch这里第一个是point的个数,第二个是时间,单位毫秒
* point的个数和时间是联合使用的,如果满100条或者1000毫秒
* 满足任何一个条件就会发送一次写的请求。
* 如果不需要异步插入则将后面 .enableBatch 部分注释掉即可
*/
influxDB.setDatabase(database)
.enableBatch(100,1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
return influxDB;
}
}
二、基本用法
1,插入数据
(1)我们创建一个 Controller 通过调用 InfluxDB 对象来插入两条数据:提示:数据库和对应的表可以不需要提前创建好,插入时会自动创建。
@RestController
public class HelloController {
@Autowired
private InfluxDB influxDB;
@Value("${spring.influx.database}")
private String database;
@GetMapping("/test1")
public void test1() {
System.out.println("---开始插入数据---");
influxDB.write(Point.measurement("temperature")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("machine", "unit42")
.tag("type", "assembly")
.addField("external", 11)
.addField("internal", 22)
.build());
influxDB.write(Point.measurement("temperature")
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("machine", "unit43")
.tag("type", "assembly")
.addField("external", 33)
.addField("internal", 44)
.build());
}
}
(2)使用浏览器访问 /test1 接口,可以看到控制台输出如下:
(3)查询下数据库可以看到数据已成功插入:

2,查询数据
(1)我们对上面的 Controller 稍作修改,增加一个查询的测试方法:
@RestController
public class HelloController {
@Autowired
private InfluxDB influxDB;
@Value("${spring.influx.database}")
private String database;
@GetMapping("/test2")
public void test2() {
System.out.println("---开始查询数据---");
// InfluxDB支持分页查询,因此可以设置分页查询条件
int page = 1; // 起始页(从0开始)
int pageSize = 10; // 每页条目数
String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
String queryCondition = ""; //查询条件暂且为空
// 此处查询所有内容,如果
String queryCmd = "SELECT * FROM "
// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
// + 策略name + "." + measurement
+ "temperature"
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
// 开始查询
QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
System.out.println("原始结果为:" + queryResult);
// 获取查询结果
List<QueryResult.Result> results = queryResult.getResults();
if (results == null) {
return;
}
// 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriesList = result.getSeries();
for (QueryResult.Series series : seriesList) {
if (series == null) {
return;
}
System.out.println("结果数量为:" + (series.getValues() == null ?
0 : series.getValues().size()));
System.out.println("colums ==>> " + series.getColumns());
System.out.println("tags ==>> " + series.getTags());
System.out.println("name ==>> " + series.getName());
System.out.println("values ==>> " + series.getValues());
}
}
}
(2)使用浏览器访问 /test2 接口,可以看到控制台输出如下:

三、通过实体类进行数据插入
1,构建实体类
首先我们创建 temperature 这个表对应的实体类,代码如下:
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
@Measurement(name = "temperature")
public class Temperature {
// Column中的name为measurement中的列名
// 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
@Column(name = "time")
private Long time;
// 注解中添加tag = true,表示当前字段内容为tag内容
@Column(name = "machine", tag = true)
private String machine;
@Column(name = "type", tag = true)
private String type;
@Column(name = "external")
private Float external;
@Column(name = "internal")
private Float internal;
}
2,插入数据
我们对上面的 Controller 稍作修改,这次插入是是通过实体类对象进行插入:
@RestController
public class HelloController {
@Autowired
private InfluxDB influxDB;
@GetMapping("/test1")
public void test1() {
System.out.println("---开始插入数据---");
Temperature temperature1 = new Temperature(System.currentTimeMillis(),
"unit42", "assembly", 11F ,22F);
influxDB.write(Point.measurementByPOJO(Temperature.class)
.time(temperature1.getTime(), TimeUnit.MILLISECONDS)
.addFieldsFromPOJO(temperature1)
.build());
Temperature temperature2 = new Temperature(System.currentTimeMillis(),
"unit43", "assembly", 33F ,44F);
influxDB.write(Point.measurementByPOJO(Temperature.class)
.time(temperature1.getTime(), TimeUnit.MILLISECONDS)
.addFieldsFromPOJO(temperature2)
.build());
}
}
3,查询数据
目前无法自动将查询结果转成我们自定义的实体类对象,只能手动进行创建并赋值:@RestController
public class HelloController {
@Autowired
private InfluxDB influxDB;
@Value("${spring.influx.database}")
private String database;
@GetMapping("/test2")
public List<Temperature> test2() {
System.out.println("---开始查询数据---");
// InfluxDB支持分页查询,因此可以设置分页查询条件
int page = 1; // 起始页(从0开始)
int pageSize = 10; // 每页条目数
String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
String queryCondition = ""; //查询条件暂且为空
// 此处查询所有内容,如果
String queryCmd = "SELECT time, machine, type, external, internal FROM "
// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
// + 策略name + "." + measurement
+ "temperature"
// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
+ queryCondition
// 查询结果需要按照时间排序
+ " ORDER BY time DESC"
// 添加分页查询条件
+ pageQuery;
// 开始查询
QueryResult queryResult = influxDB.query(new Query(queryCmd, database),
TimeUnit.MILLISECONDS);
System.out.println("原始结果为:" + queryResult);
// 获取查询结果
List<QueryResult.Result> results = queryResult.getResults();
if (results == null) {
return null;
}
// 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行
QueryResult.Result result = results.get(0);
List<QueryResult.Series> seriesList = result.getSeries();
List<Temperature> temperatureList = new LinkedList<>();
for (QueryResult.Series series : seriesList) {
if (series == null) {
return null;
}
System.out.println("结果数量为:" + (series.getValues() == null ?
0 : series.getValues().size()));
System.out.println("colums ==>> " + series.getColumns());
System.out.println("tags ==>> " + series.getTags());
System.out.println("name ==>> " + series.getName());
System.out.println("values ==>> " + series.getValues());
series.getValues().forEach(valueData -> {
Temperature temperature = new Temperature();
// 直接查询出来的是科学计数法,需要转换为Long类型的数据
BigDecimal decimalTime = new BigDecimal(valueData.get(0).toString());
temperature.setTime(decimalTime.longValue());
temperature.setMachine(valueData.get(1).toString());
temperature.setType(valueData.get(2).toString());
temperature.setExternal(Float.valueOf(valueData.get(3).toString()));
temperature.setInternal(Float.valueOf(valueData.get(4).toString()));
temperatureList.add(temperature);
});
}
return temperatureList;
}
}
全部评论(0)