Storm - 任务程序开发教程(Kafka数据清洗示例)
作者:hangge | 2025-02-12 08:47
Storm 一个典型的实时数据计算场景:先读取 Kafka 中的数据,然后对数据进行清洗处理,最后将结果输出到第三方存储介质(例如 Redis、Kafka、MySQL 等)中。本文通过样例演示如何使用 Storm 进行实时数据计算。

(2)而 DataCleanBolt 类需要手工实现,其逻辑是对订单数据进行清洗,具体代码如下:
(2)然后在 IDEA 中启动 DataProcessTopology 代码。
(3)接着,打开 Kakfa 的控制台消费者:
(4)同时,打开 Kafka 的控制台生产者:
(5)通过生产者发送一些模拟的生产订单数据:
(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
(3)然后执行打 Jar 包的操作:

(2)然后到 Storm 集群的 Web 界面上查看提交的任务信息:
(2)然后打开 Kafka 的控制台生产者:
(3)通过生产者发送一些模拟的生产订单数据:
(2)然后,使用 storm 的 kill 命令停止该任务即可:

1,需求说明
(1)本文通过 Storm 实现实时订单数据清洗功能,对 Kafka 中的原始订单数据(Topic:order_data)进行核心字段的提取,并将提取出来的核心字段组装起来并输出到 Kakfa 中(Topic :order_data_clean)。
(2)具体实现思路如下:
- 首先需要有一个 Spout 组件(KafkaSpout)从 Kafka 中消费实时新增的订单数据。
- 由于订单数据不是很复杂,所以只需要有一个 Bolt 组件(DataCleanBolt)负责数据清洗。
- 最后还需要一个 Bolt 组件(KafkaBolt)负责将清洗之后的数据写出去。

2,创建项目
这里我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加相关的依赖:
<!-- Storm 依赖 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
</dependency>
<!-- Kafka 依赖 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!-- fastjson 依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
3,编写代码
(1)首先编写一个 DataProcessTopology.java 核心代码类,其作用是从 Kafka 集群中消费数据,并通过配置的 Bolt(处理器)执行数据清洗操作,然后将清洗后的数据发送到另一个 Kafka Topic 中。
注意:其中的 KafkaSpout 和 KafkaBolt 都是使用 storm-kafka-client 这个依赖中封装好的类,不需要我们实现底层的代码。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Properties;
public class DataProcessTopology {
public static void main(String[] args) throws Exception {
//将消费到的 Kafka 数据转换为 Storm 中的 Tuple
ByTopicRecordTranslator<String, String> brt =
new ByTopicRecordTranslator<String, String>((r) -> new
Values(r.value(), r.topic()), new Fields("values", "topic"));
//配置 KafkaSpout
KafkaSpoutConfig<String, String> ksc = KafkaSpoutConfig
//指定 Kafka 集群机制和 Kafka 的输入 Topic
.builder("node1:9092,node2:9092,node3:9092", "order_data")
//设置 group.id
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "g001")
//设置消费的起始位置
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
//设置提交消费 Offset 的时长间隔
.setOffsetCommitPeriodMs(10_000)
//配置 Translator
.setRecordTranslator(brt)
.build();
//配置 KafkaBolt
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
@SuppressWarnings({"unchecked", "rawtypes"})
KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props)
//指定输出目的地 Topic
.withTopicSelector(new DefaultTopicSelector("order_data_clean"));
//组装 Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout", new KafkaSpout<String, String>(ksc));
builder.setBolt("dataclean_bolt", new DataCleanBolt()).shuffleGrouping("kafkaspout");
builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("dataclean_bolt");
//提交 Topology
Config config = new Config();
String topologyName = DataProcessTopology.class.getSimpleName();
StormTopology stormTopology = builder.createTopology();
if (args.length == 0) {
//创建本地 Storm 集群
LocalCluster cluster = new LocalCluster();
//向本地 Storm 集群提交 Topology
cluster.submitTopology(topologyName, config, stormTopology);
} else {
//向生产环境的 Storm 集群提交 Topology
StormSubmitter.submitTopology(topologyName, config, stormTopology);
}
}
}
(2)而 DataCleanBolt 类需要手工实现,其逻辑是对订单数据进行清洗,具体代码如下:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class DataCleanBolt extends BaseRichBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple input) {
//获取原始订单数据
String order_data = input.getString(0);
//解析原始订单数据中的核心字段
JSONObject jsonObject = JSON.parseObject(order_data);
String order_id = jsonObject.getString("order_id");
int price = jsonObject.getIntValue("price");
//根据需求组装结果
String res = order_id + "," + price;
//将结果发送给下一个组件
outputCollector.emit(new Values(res));
//向 Spout 组件确认已成功处理订单数据
outputCollector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//注意:如果后面使用 KafkaBolt 组件接收数据,则此处的字段名称必须是 message
outputFieldsDeclarer.declare(new Fields("message"));
}
}
4,运行测试
(1)首先,我们在 Kafka 中创建两个 Topic:order_data 和 order_data_clean
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data_clean
(2)然后在 IDEA 中启动 DataProcessTopology 代码。
提示:我们在 DataProcessTopology 代码中做了兼容,支持在 IDEA 中直接执行,也支持提交到生产环境的 Storm 集群中执行。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean
(4)同时,打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data
(5)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2}
{"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50}
{"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}
(6)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明整个流程是成功的。

附一:向 Storm 集群中提交任务
1,对 Storm 任务打 Jar 包
(1)在 IDEA 中调试通过之后,就可以将代码提交到生产环境的 Storm 集群中运行了。首先修改 pom.xml 文件中依赖的作用范围:
注意:此处在 storm-core 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 storm-core 依赖在 Storm 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<!-- Storm 依赖 -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
<build>
<plugins>
<!-- compiler 插件,设定 JDK 版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
(3)然后执行打 Jar 包的操作:

(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

2,提交任务
(1)将打包后的 jar 包上传到服务器,然后执行如下命令向 Storm 集群提交任务:
bin/storm jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar DataProcessTopology cluster
(2)然后到 Storm 集群的 Web 界面上查看提交的任务信息:

3,使用测试
(1)首先打开 Kakfa 的控制台消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean
(2)然后打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data
(3)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2}
{"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50}
{"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}
(4)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明任务是正常的。

附二:停止 Storm 集群中正在运行的任务
Storm 实时任务在被提交到集群后,正常情况下是不会停止的,它会一直运行,除非因代码运行出错而停止。当然,有时是因为需要对代码进行升级的,所以需要手工停止正在运行的 Storm 任务。停止正在运行的 Storm 任务有两种方式。
1,在 Storm 集群的 Web 界面上停止任务
在 Storm 集群的 Web 界面中找到对应的任务,然后单击 TopologyName 进入该任务的详细页面,单击其中的“kill”按钮即可。

2,使用 storm 脚本停止任务
(1)首先,执行如下命令找出想要停止的 Storm 任务的 Topology_name:
bin/storm list
(2)然后,使用 storm 的 kill 命令停止该任务即可:
bin/storm kill DataProcessTopology
全部评论(0)