返回 导航

SpringBoot / Cloud

hangge.com

SpringBoot - InfluxDB数据库的集成与使用详解

作者:hangge | 2020-10-09 08:10
  InfluxDB 是一个开源分布式时序、时间和指标数据库,使用 Go 语言编写,无需外部依赖。该数据库现在主要用于存储涉及大量的时间戳数据,如 DevOps 监控数据、APP metricsIoT 传感器数据和实时分析数据。关于 InfluxDB 更详细的介绍可以查看我之前写的文章(点击查看)。本文演示如何在 Spring Boot 中操作 InfluxDB 数据库。

一、准备工作

1,环境准备

关于 InfluxDB 的安装,可以参考我之前写的文章:

2,项目配置

(1)首先编辑项目的 pom.xml 文件,添加 hbase-client 依赖:
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.15</version>
</dependency>

(2)然后在 application.properties 中添加 InfluxDB 相关配置:
spring.influx.url=http://192.168.60.133:8086
#spring.influx.user=
#spring.influx.password=
spring.influx.database=mydb

(3)最后我们要创建一个配置类 InfluxDbConfig 来读取配置文件,并生成 InfluxDB 实例:
@Configuration
public class InfluxDbConfig {

	@Value("${spring.influx.url:''}")
	private String influxDBUrl;

	@Value("${spring.influx.user:''}")
	private String userName;

	@Value("${spring.influx.password:''}")
	private String password;

	@Value("${spring.influx.database:''}")
	private String database;

	// 数据保存策略
	public static String retentionPolicy = "autogen";

	@Bean
	public InfluxDB influxDB(){
		InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
		try {
			/**
			 * 异步插入:
			 * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒
			 * point的个数和时间是联合使用的,如果满100条或者1000毫秒
			 * 满足任何一个条件就会发送一次写的请求。
			 * 如果不需要异步插入则将后面 .enableBatch 部分注释掉即可
			 */
			influxDB.setDatabase(database)
					.enableBatch(100,1000, TimeUnit.MILLISECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			influxDB.setRetentionPolicy(retentionPolicy);
		}
		influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
		return influxDB;
	}
}

二、基本用法

1,插入数据

(1)我们创建一个 Controller 通过调用 InfluxDB 对象来插入两条数据:
提示:数据库和对应的表可以不需要提前创建好,插入时会自动创建。
@RestController
public class HelloController {

	@Autowired
	private InfluxDB influxDB;

	@Value("${spring.influx.database}")
	private String database;

	@GetMapping("/test1")
	public void test1() {
		System.out.println("---开始插入数据---");
		influxDB.write(Point.measurement("temperature")
				.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
				.tag("machine", "unit42")
				.tag("type", "assembly")
				.addField("external", 11)
				.addField("internal", 22)
				.build());

		influxDB.write(Point.measurement("temperature")
				.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
				.tag("machine", "unit43")
				.tag("type", "assembly")
				.addField("external", 33)
				.addField("internal", 44)
				.build());
	}
}

(2)使用浏览器访问 /test1 接口,可以看到控制台输出如下:

(3)查询下数据库可以看到数据已成功插入:

2,查询数据

(1)我们对上面的 Controller 稍作修改,增加一个查询的测试方法:
@RestController
public class HelloController {

	@Autowired
	private InfluxDB influxDB;

	@Value("${spring.influx.database}")
	private String database;

	@GetMapping("/test2")
	public void test2() {
		System.out.println("---开始查询数据---");
		// InfluxDB支持分页查询,因此可以设置分页查询条件
		int page = 1; // 起始页(从0开始)
		int pageSize = 10;  // 每页条目数
		String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
		String queryCondition = "";  //查询条件暂且为空
		// 此处查询所有内容,如果
		String queryCmd = "SELECT * FROM "
				// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
				// + 策略name + "." + measurement
				+ "temperature"
				// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
				+ queryCondition
				// 查询结果需要按照时间排序
				+ " ORDER BY time DESC"
				// 添加分页查询条件
				+ pageQuery;

		// 开始查询
		QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
		System.out.println("原始结果为:" + queryResult);

		// 获取查询结果
		List<QueryResult.Result> results = queryResult.getResults();
		if (results == null) {
			return;
		}
		// 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行
		QueryResult.Result result = results.get(0);
		List<QueryResult.Series> seriesList = result.getSeries();

		for (QueryResult.Series series : seriesList) {
			if (series == null) {
				return;
			}
			System.out.println("结果数量为:" + (series.getValues() == null ?
					0 : series.getValues().size()));
			System.out.println("colums ==>> " + series.getColumns());
			System.out.println("tags ==>> " + series.getTags());
			System.out.println("name ==>> " + series.getName());
			System.out.println("values ==>> " + series.getValues());

		}
	}
}

(2)使用浏览器访问 /test2 接口,可以看到控制台输出如下:

三、通过实体类进行数据插入

1,构建实体类

首先我们创建 temperature 这个表对应的实体类,代码如下:
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
@Measurement(name = "temperature")
public class Temperature {
	// Column中的name为measurement中的列名
	// 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
	@Column(name = "time")
	private Long time;
	// 注解中添加tag = true,表示当前字段内容为tag内容
	@Column(name = "machine", tag = true)
	private String machine;
	@Column(name = "type", tag = true)
	private String type;
	@Column(name = "external")
	private Float external;
	@Column(name = "internal")
	private Float internal;
}

2,插入数据

我们对上面的 Controller 稍作修改,这次插入是是通过实体类对象进行插入:
@RestController
public class HelloController {

	@Autowired
	private InfluxDB influxDB;

	@GetMapping("/test1")
	public void test1() {
		System.out.println("---开始插入数据---");
		Temperature temperature1 = new Temperature(System.currentTimeMillis(),
				"unit42", "assembly", 11F ,22F);
		influxDB.write(Point.measurementByPOJO(Temperature.class)
				.time(temperature1.getTime(), TimeUnit.MILLISECONDS)
				.addFieldsFromPOJO(temperature1)
				.build());

		Temperature temperature2 = new Temperature(System.currentTimeMillis(),
				"unit43", "assembly", 33F ,44F);
		influxDB.write(Point.measurementByPOJO(Temperature.class)
				.time(temperature1.getTime(), TimeUnit.MILLISECONDS)
				.addFieldsFromPOJO(temperature2)
				.build());
	}
}

3,查询数据

目前无法自动将查询结果转成我们自定义的实体类对象,只能手动进行创建并赋值:
@RestController
public class HelloController {

	@Autowired
	private InfluxDB influxDB;

	@Value("${spring.influx.database}")
	private String database;

	@GetMapping("/test2")
	public List<Temperature> test2() {
		System.out.println("---开始查询数据---");
		// InfluxDB支持分页查询,因此可以设置分页查询条件
		int page = 1; // 起始页(从0开始)
		int pageSize = 10;  // 每页条目数
		String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
		String queryCondition = "";  //查询条件暂且为空
		// 此处查询所有内容,如果
		String queryCmd = "SELECT time, machine, type, external, internal FROM "
				// 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
				// + 策略name + "." + measurement
				+ "temperature"
				// 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
				+ queryCondition
				// 查询结果需要按照时间排序
				+ " ORDER BY time DESC"
				// 添加分页查询条件
				+ pageQuery;

		// 开始查询
		QueryResult queryResult = influxDB.query(new Query(queryCmd, database), 
				TimeUnit.MILLISECONDS);
		System.out.println("原始结果为:" + queryResult);

		// 获取查询结果
		List<QueryResult.Result> results = queryResult.getResults();
		if (results == null) {
			return null;
		}
		// 多个sql用分号隔开,因本次查询只有一个sql,所以取第一个就行
		QueryResult.Result result = results.get(0);
		List<QueryResult.Series> seriesList = result.getSeries();
		List<Temperature> temperatureList = new LinkedList<>();

		for (QueryResult.Series series : seriesList) {
			if (series == null) {
				return null;
			}
			System.out.println("结果数量为:" + (series.getValues() == null ?
					0 : series.getValues().size()));
			System.out.println("colums ==>> " + series.getColumns());
			System.out.println("tags ==>> " + series.getTags());
			System.out.println("name ==>> " + series.getName());
			System.out.println("values ==>> " + series.getValues());

			series.getValues().forEach(valueData -> {
				Temperature temperature = new Temperature();
				// 直接查询出来的是科学计数法,需要转换为Long类型的数据
				BigDecimal decimalTime = new BigDecimal(valueData.get(0).toString());
				temperature.setTime(decimalTime.longValue());
				temperature.setMachine(valueData.get(1).toString());
				temperature.setType(valueData.get(2).toString());
				temperature.setExternal(Float.valueOf(valueData.get(3).toString()));
				temperature.setInternal(Float.valueOf(valueData.get(4).toString()));
				temperatureList.add(temperature);
			});
		}
		return temperatureList;
	}
}
评论

全部评论(0)

回到顶部