Storm - 任务程序开发教程(Kafka数据清洗示例)
作者:hangge | 2025-02-12 08:47
Storm 一个典型的实时数据计算场景:先读取 Kafka 中的数据,然后对数据进行清洗处理,最后将结果输出到第三方存储介质(例如 Redis、Kafka、MySQL 等)中。本文通过样例演示如何使用 Storm 进行实时数据计算。

(2)而 DataCleanBolt 类需要手工实现,其逻辑是对订单数据进行清洗,具体代码如下:
(2)然后在 IDEA 中启动 DataProcessTopology 代码。
(3)接着,打开 Kakfa 的控制台消费者:
(4)同时,打开 Kafka 的控制台生产者:
(5)通过生产者发送一些模拟的生产订单数据:
(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
(3)然后执行打 Jar 包的操作:

(2)然后到 Storm 集群的 Web 界面上查看提交的任务信息:
(2)然后打开 Kafka 的控制台生产者:
(3)通过生产者发送一些模拟的生产订单数据:
(2)然后,使用 storm 的 kill 命令停止该任务即可:

1,需求说明
(1)本文通过 Storm 实现实时订单数据清洗功能,对 Kafka 中的原始订单数据(Topic:order_data)进行核心字段的提取,并将提取出来的核心字段组装起来并输出到 Kakfa 中(Topic :order_data_clean)。
(2)具体实现思路如下:
- 首先需要有一个 Spout 组件(KafkaSpout)从 Kafka 中消费实时新增的订单数据。
- 由于订单数据不是很复杂,所以只需要有一个 Bolt 组件(DataCleanBolt)负责数据清洗。
- 最后还需要一个 Bolt 组件(KafkaBolt)负责将清洗之后的数据写出去。

2,创建项目
这里我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加相关的依赖:
<!-- Storm 依赖 --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>2.4.0</version> </dependency> <!-- Kafka 依赖 --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <!-- fastjson 依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency>
3,编写代码
(1)首先编写一个 DataProcessTopology.java 核心代码类,其作用是从 Kafka 集群中消费数据,并通过配置的 Bolt(处理器)执行数据清洗操作,然后将清洗后的数据发送到另一个 Kafka Topic 中。
注意:其中的 KafkaSpout 和 KafkaBolt 都是使用 storm-kafka-client 这个依赖中封装好的类,不需要我们实现底层的代码。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.bolt.KafkaBolt; import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; import org.apache.storm.kafka.spout.ByTopicRecordTranslator; import org.apache.storm.kafka.spout.FirstPollOffsetStrategy; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Properties; public class DataProcessTopology { public static void main(String[] args) throws Exception { //将消费到的 Kafka 数据转换为 Storm 中的 Tuple ByTopicRecordTranslator<String, String> brt = new ByTopicRecordTranslator<String, String>((r) -> new Values(r.value(), r.topic()), new Fields("values", "topic")); //配置 KafkaSpout KafkaSpoutConfig<String, String> ksc = KafkaSpoutConfig //指定 Kafka 集群机制和 Kafka 的输入 Topic .builder("node1:9092,node2:9092,node3:9092", "order_data") //设置 group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "g001") //设置消费的起始位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费 Offset 的时长间隔 .setOffsetCommitPeriodMs(10_000) //配置 Translator .setRecordTranslator(brt) .build(); //配置 KafkaBolt Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); @SuppressWarnings({"unchecked", "rawtypes"}) KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props) //指定输出目的地 Topic .withTopicSelector(new DefaultTopicSelector("order_data_clean")); //组装 Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaspout", new KafkaSpout<String, String>(ksc)); builder.setBolt("dataclean_bolt", new DataCleanBolt()).shuffleGrouping("kafkaspout"); builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("dataclean_bolt"); //提交 Topology Config config = new Config(); String topologyName = DataProcessTopology.class.getSimpleName(); StormTopology stormTopology = builder.createTopology(); if (args.length == 0) { //创建本地 Storm 集群 LocalCluster cluster = new LocalCluster(); //向本地 Storm 集群提交 Topology cluster.submitTopology(topologyName, config, stormTopology); } else { //向生产环境的 Storm 集群提交 Topology StormSubmitter.submitTopology(topologyName, config, stormTopology); } } }
(2)而 DataCleanBolt 类需要手工实现,其逻辑是对订单数据进行清洗,具体代码如下:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; public class DataCleanBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } @Override public void execute(Tuple input) { //获取原始订单数据 String order_data = input.getString(0); //解析原始订单数据中的核心字段 JSONObject jsonObject = JSON.parseObject(order_data); String order_id = jsonObject.getString("order_id"); int price = jsonObject.getIntValue("price"); //根据需求组装结果 String res = order_id + "," + price; //将结果发送给下一个组件 outputCollector.emit(new Values(res)); //向 Spout 组件确认已成功处理订单数据 outputCollector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //注意:如果后面使用 KafkaBolt 组件接收数据,则此处的字段名称必须是 message outputFieldsDeclarer.declare(new Fields("message")); } }
4,运行测试
(1)首先,我们在 Kafka 中创建两个 Topic:order_data 和 order_data_clean
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data_clean
(2)然后在 IDEA 中启动 DataProcessTopology 代码。
提示:我们在 DataProcessTopology 代码中做了兼容,支持在 IDEA 中直接执行,也支持提交到生产环境的 Storm 集群中执行。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean
(4)同时,打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data
(5)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2} {"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50} {"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}

(6)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明整个流程是成功的。

附一:向 Storm 集群中提交任务
1,对 Storm 任务打 Jar 包
(1)在 IDEA 中调试通过之后,就可以将代码提交到生产环境的 Storm 集群中运行了。首先修改 pom.xml 文件中依赖的作用范围:
注意:此处在 storm-core 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 storm-core 依赖在 Storm 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<!-- Storm 依赖 --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>2.4.0</version> <scope>provided</scope> </dependency>
(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
<build> <plugins> <!-- compiler 插件,设定 JDK 版本 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> <showWarnings>true</showWarnings> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(3)然后执行打 Jar 包的操作:

(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

2,提交任务
(1)将打包后的 jar 包上传到服务器,然后执行如下命令向 Storm 集群提交任务:
bin/storm jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar DataProcessTopology cluster
(2)然后到 Storm 集群的 Web 界面上查看提交的任务信息:

3,使用测试
(1)首先打开 Kakfa 的控制台消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean
(2)然后打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data
(3)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2} {"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50} {"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}

(4)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明任务是正常的。

附二:停止 Storm 集群中正在运行的任务
Storm 实时任务在被提交到集群后,正常情况下是不会停止的,它会一直运行,除非因代码运行出错而停止。当然,有时是因为需要对代码进行升级的,所以需要手工停止正在运行的 Storm 任务。停止正在运行的 Storm 任务有两种方式。
1,在 Storm 集群的 Web 界面上停止任务
在 Storm 集群的 Web 界面中找到对应的任务,然后单击 TopologyName 进入该任务的详细页面,单击其中的“kill”按钮即可。

2,使用 storm 脚本停止任务
(1)首先,执行如下命令找出想要停止的 Storm 任务的 Topology_name:
bin/storm list

(2)然后,使用 storm 的 kill 命令停止该任务即可:
bin/storm kill DataProcessTopology

全部评论(0)