返回 导航

大数据

hangge.com

Storm - 任务程序开发教程(Kafka数据清洗示例)

作者:hangge | 2025-02-12 08:47
    Storm 一个典型的实时数据计算场景:先读取 Kafka 中的数据,然后对数据进行清洗处理,最后将结果输出到第三方存储介质(例如 RedisKafkaMySQL 等)中。本文通过样例演示如何使用 Storm 进行实时数据计算。

1,需求说明

(1)本文通过 Storm 实现实时订单数据清洗功能,对 Kafka 中的原始订单数据(Topicorder_data)进行核心字段的提取,并将提取出来的核心字段组装起来并输出到 Kakfa 中(Topicorder_data_clean)。

(2)具体实现思路如下:
  • 首先需要有一个 Spout 组件(KafkaSpout)从 Kafka 中消费实时新增的订单数据。
  • 由于订单数据不是很复杂,所以只需要有一个 Bolt 组件(DataCleanBolt)负责数据清洗。
  • 最后还需要一个 Bolt 组件(KafkaBolt)负责将清洗之后的数据写出去。

2,创建项目

这里我们创建一个 Maven 项目,然后在项目的 pom.xml 文件中添加相关的依赖:
<!-- Storm 依赖 -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.4.0</version>
</dependency>
<!-- Kafka 依赖 -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka-client</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
<!-- fastjson 依赖 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

3,编写代码

(1)首先编写一个 DataProcessTopology.java 核心代码类,其作用是从 Kafka 集群中消费数据,并通过配置的 Bolt(处理器)执行数据清洗操作,然后将清洗后的数据发送到另一个 Kafka Topic 中。
注意:其中的 KafkaSpoutKafkaBolt 都是使用 storm-kafka-client 这个依赖中封装好的类,不需要我们实现底层的代码。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Properties;

public class DataProcessTopology {
  public static void main(String[] args) throws Exception {
    //将消费到的 Kafka 数据转换为 Storm 中的 Tuple
    ByTopicRecordTranslator<String, String> brt =
            new ByTopicRecordTranslator<String, String>((r) -> new
                    Values(r.value(), r.topic()), new Fields("values", "topic"));

    //配置 KafkaSpout
    KafkaSpoutConfig<String, String> ksc = KafkaSpoutConfig
    //指定 Kafka 集群机制和 Kafka 的输入 Topic
    .builder("node1:9092,node2:9092,node3:9092", "order_data")
    //设置 group.id
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "g001")
    //设置消费的起始位置
    .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
    //设置提交消费 Offset 的时长间隔
    .setOffsetCommitPeriodMs(10_000)
    //配置 Translator
    .setRecordTranslator(brt)
    .build();

    //配置 KafkaBolt
    Properties props = new Properties();
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    @SuppressWarnings({"unchecked", "rawtypes"})
    KafkaBolt kafkaBolt = new KafkaBolt().withProducerProperties(props)
    //指定输出目的地 Topic
    .withTopicSelector(new DefaultTopicSelector("order_data_clean"));

    //组装 Topology
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafkaspout", new KafkaSpout<String, String>(ksc));
    builder.setBolt("dataclean_bolt", new DataCleanBolt()).shuffleGrouping("kafkaspout");
    builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("dataclean_bolt");

    //提交 Topology
    Config config = new Config();
    String topologyName = DataProcessTopology.class.getSimpleName();
    StormTopology stormTopology = builder.createTopology();
    if (args.length == 0) {
      //创建本地 Storm 集群
      LocalCluster cluster = new LocalCluster();
      //向本地 Storm 集群提交 Topology
      cluster.submitTopology(topologyName, config, stormTopology);
    } else {
     //向生产环境的 Storm 集群提交 Topology
      StormSubmitter.submitTopology(topologyName, config, stormTopology);
    }
  }
}

(2)而 DataCleanBolt 类需要手工实现,其逻辑是对订单数据进行清洗,具体代码如下:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;

public class DataCleanBolt extends BaseRichBolt {
  private OutputCollector outputCollector;

  @Override
  public void prepare(Map<String, Object> map, TopologyContext topologyContext,
                      OutputCollector outputCollector) {
    this.outputCollector = outputCollector;
  }

  @Override
  public void execute(Tuple input) {
    //获取原始订单数据
    String order_data = input.getString(0);
    //解析原始订单数据中的核心字段
    JSONObject jsonObject = JSON.parseObject(order_data);
    String order_id = jsonObject.getString("order_id");
    int price = jsonObject.getIntValue("price");
    //根据需求组装结果
    String res = order_id + "," + price;
    //将结果发送给下一个组件
    outputCollector.emit(new Values(res));
    //向 Spout 组件确认已成功处理订单数据
    outputCollector.ack(input);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    //注意:如果后面使用 KafkaBolt 组件接收数据,则此处的字段名称必须是 message
    outputFieldsDeclarer.declare(new Fields("message"));
  }
}

4,运行测试

(1)首先,我们在 Kafka 中创建两个 Topicorder_dataorder_data_clean
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 2 --topic order_data_clean

(2)然后在 IDEA 中启动 DataProcessTopology 代码。
提示:我们在 DataProcessTopology 代码中做了兼容,支持在 IDEA 中直接执行,也支持提交到生产环境的 Storm 集群中执行。

(3)接着,打开 Kakfa 的控制台消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean

(4)同时,打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data

(5)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2}
{"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50}
{"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}

(6)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明整个流程是成功的。

附一:向 Storm 集群中提交任务

1,对 Storm 任务打 Jar 包

(1)在 IDEA 中调试通过之后,就可以将代码提交到生产环境的 Storm 集群中运行了。首先修改 pom.xml 文件中依赖的作用范围:
注意:此处在 storm-core 依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 storm-core 依赖在 Storm 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<!-- Storm 依赖 -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.4.0</version>
    <scope>provided</scope>
</dependency>

(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
<build>
    <plugins>
        <!-- compiler 插件,设定 JDK 版本 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.5.1</version>
            <configuration>
                <encoding>UTF-8</encoding>
                <source>1.8</source>
                <target>1.8</target>
                <showWarnings>true</showWarnings>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(3)然后执行打 Jar 包的操作:

(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

2,提交任务

(1)将打包后的 jar 包上传到服务器,然后执行如下命令向 Storm 集群提交任务:
bin/storm jar MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar DataProcessTopology cluster

(2)然后到 Storm 集群的 Web 界面上查看提交的任务信息:

3,使用测试

(1)首先打开 Kakfa 的控制台消费者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order_data_clean

(2)然后打开 Kafka 的控制台生产者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order_data

(3)通过生产者发送一些模拟的生产订单数据:
{"order_id":"J_100001","order_time":"2024-01-01 10:10:10","state":0,"op_name":"pen","price":2}
{"order_id":"J_100004","order_time":"2024-01-15 12:10:15","state":0,"op_name":"apple","price":50}
{"order_id":"J_100005","order_time":"2024-01-16 08:33:10","state":0,"op_name":"cat","price":15}


(4)如果能在 Kakfa 的控制台消费者中看到清洗之后的数据,则说明任务是正常的。

附二:停止 Storm 集群中正在运行的任务

    Storm 实时任务在被提交到集群后,正常情况下是不会停止的,它会一直运行,除非因代码运行出错而停止。当然,有时是因为需要对代码进行升级的,所以需要手工停止正在运行的 Storm 任务。停止正在运行的 Storm 任务有两种方式。

1,在 Storm 集群的 Web 界面上停止任务

Storm 集群的 Web 界面中找到对应的任务,然后单击 TopologyName 进入该任务的详细页面,单击其中的“kill”按钮即可。

2,使用 storm 脚本停止任务

(1)首先,执行如下命令找出想要停止的 Storm 任务的 Topology_name
bin/storm list

(2)然后,使用 stormkill 命令停止该任务即可:
bin/storm kill DataProcessTopology
评论

全部评论(0)

回到顶部