在《在滴滴云 DC2 云服务器上搭建 ZooKeeper 集群实战(一)》中介绍了 ZooKeeper 集群的搭建,本文在此基础上介绍 ZooKeeper 的实际应用–JStorm。
JStrom 简介
JStorm 是一个类似 Hadoop MapReduce 的系统, 用户按照指定的接口实现一个任务,然后将这个任务递交给 JStorm 系统,JStorm 将这个任务跑起来。
JStorm 是一套基于流水线的消息处理机制。
JStorm 处理数据的方式是基于消息的流水线处理, 因此特别适合无状态计算,也就是计算单元的依赖的数据全部在接受的消息中可以找到, 并且最好一个数据流不依赖另外一个数据流。因此,常常用于:
- 日志分析,从日志中分析出特定的数据,并将分析的结果存入外部存储器如数据库。目前,主流日志分析技术就使用 JStorm 或 Storm。
- 管道系统, 将一个数据从一个系统传输到另外一个系统, 比如将数据库同步到 Hadoop。
- 消息转化器, 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件。
- 统计分析器, 从日志或消息中,提炼出某个字段,然后做 count 或 sum 计算,最后将统计值存入外部存储器。中间处理过程可能更复杂。
- 实时推荐系统, 将推荐算法运行在 JStorm 中,达到秒级的推荐效果。
基本概念
首先,JStorm 有点类似于 Hadoop 的 MR(Map-Reduce),但是区别在于,Hadoop 的 MR,提交到 Hadoop 的 MR job,执行完就结束了,进程就退出了。而一个 JStorm 任务(JStorm 中称为 topology),是 7* 24 小时永远在运行的,除非用户主动 kill。
在 JStorm 的 topology 中,有两种组件:spout 和 bolt。
- spout 代表输入的数据源,这个数据源可以是任意的,比如说 Kafka,DB,HBase,甚至是 HDFS 等,JStorm 从这个数据源中不断地读取数据,然后发送到下游的 bolt 中进行处理。
- bolt 代表处理逻辑,bolt 收到消息之后,对消息做处理(即执行用户的业务逻辑),处理完以后,既可以将处理后的消息继续发送到下游的 bolt,这样会形成一个处理流水线(pipeline,不过更精确的应该是个有向图);也可以直接结束。
通常一个流水线的最后一个 bolt,会做一些数据的存储工作,比如将实时计算出来的数据写入 DB、HBase 等,以供前台业务进行查询和展现。
详细信息请参见官方文档。
安装部署
安装 JStorm 分为 2 个步骤。
- 安装 JStorm 引擎
- Standalone, JStorm 单独部署,不依赖外部系统,比如 Yarn 或 Docker Swarm。
- Yarn, 将 JStorm 运行在 Yarn 上, 会在 Yarn 提交一个 App, 这个 App 就是一个 JStorm 逻辑集群。
- Docker, 在集群中创建一批 Docker, 这批 Docker 组成一个 JStorm 的逻辑集群。
- 安装 JStorm UI
Standalone 部署方式是最简单、 最轻量、最稳定、也是最常用的方式。当整体规模不超过 300 台时,Standalone 是最简单的方式。
本文选择 Standalone 方式部署。
1. 安装 JStorm 引擎
首先确保机器已经安装好 Java 环境,本文选取的是 1.8 版本。
获取 JStrom 文件,本文选取 2.1.1 版本:
1 2 |
wget https://github.com/alibaba/jstorm/releases/download/2.1.1/jstorm-2.1.1.zip |
将 JStorm 解压至指定目录:
1 2 |
unzip jstorm-2.1.1.zip > /usr/local/jstorm-2.1.1 |
修改配置信息:
1 2 3 4 5 6 7 8 |
vi /etc/profile export JSTORM_HOME=/usr/local/jstorm-2.1.1/ export PATH=$PATH:$JSTORM_HOME/bin source /etc/profile |
执行 JStorm 命令则会看到以下提示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
/bin/java jstorm command [--config client_storm.yaml] [--exclude-jars exclude1.jar,exclude2.jar] [-c key1=value1,key2=value2][command parameter] Commands: activate classpath deactivate drpc help jar kill list localconfvalue metricsMonitor nimbus rebalance remoteconfvalue restart supervisor update_topology zktool [--config client_storm.yaml] optional, setting client's storm.yaml [--exclude-jars exclude1.jar,exclude2.jar] optional, exclude jars, avoid jar conflict [-c key1=value1,key2=value2] optional, add key=value pair to configuration Help: help help <command> Documentation for the jstorm client can be found at https://github.com/alibaba/jstorm/wiki/JStorm-Chinese-Documentation |
进入目录 /usr/local/jstorm-2.1.1/conf,编辑 storm.yaml 文件:
1 2 |
vi storm.yaml |
1 2 3 4 5 6 7 8 9 10 11 |
storm.zookeeper.servers: #表示zookeeper 的地址, 这里我们使用上文建立好的zk集群 - "*.*.*.*" #zk0 ip地址 - "*.*.*.*" #zk1 ip地址 - "*.*.*.*" #zk2 ip地址 storm.zookeeper.root: "/hehe" #表示JStorm在zookeeper中的根目录,当多个JStorm共享一个 zookeeper时,需要设置该选项,默认即为“/jstorm” nimbus.host: "*" #表示nimbus的地址, 填写ip storm.local.dir: "/usr/local/jstorm-2.1.1/data" #表示JStorm临时数据存放目录,需要保证JStorm程序对该目录有写权限 |
至此, JStorm 引擎安装完毕,稍后启动引擎。
2. 安装 JStorm WebUI 安装部署
WebUI 的安装部署, 和 JStorm 是完全独立的。而且,并不要求 WebUI 的机器必须是在 JStorm 机器中。
由于资源有限,本文将放在同一个机器中演示。
拷贝配置文件 storm.yaml 至当前用户的工作目录:
1 2 3 |
mkdir ~/.jstorm cp -f $JSTORM_HOME/conf/storm.yaml ~/.jstorm |
安装 Tomcat,本文选择 Tomcat8:
1 2 3 4 |
cd / wget http://mirrors.shu.edu.cn/apache/tomcat/tomcat-8/v8.5.37/bin/apache-tomcat-8.5.37.tar.gz tar -xzf apache-tomcat-8.5.37.tar.gz |
将 JStorm 目录中的 jstorm-ui-2.1.1.war 拷贝到 Tomcat 的工作目录,并启动 Tomcat:
1 2 3 4 5 6 7 8 |
cd apache-tomcat-8.5.37 cd webapps cp $JSTORM_HOME/jstorm-ui-2.1.1.war ./ mv ROOT ROOT.old ln -s jstorm-ui-2.1.1 ROOT cd ../bin ./startup.sh |
等待 Tomcat 启动完毕,在浏览器中打开http://{jstorm-ui的地址}:8080/ ,可以看到如下图所示,此时 JStorm的信息为空:
在 Nimbus 节点上执行命令,查看 $JSTORM_HOME/logs/nimbus.log 检查有无错误:
1 2 |
nohup jstorm nimbus & |
在 Supervisor 节点上执行,查看 $JSTORM_HOME/logs/supervisor.log 检查有无错误:
1 2 |
nohup jstorm supervisor & |
本文 Nimbus 与 Supervisor 在同一节点上。等待执行成功,我们可以再 UI 中看到 JStorm 运行信息,如图所示:
Cluster Summary 为当前环境的总览信息,Cluster Name 为我们在 ZK 上设置的主节点。
Nimbus Summary、Supervisor Summary 为我们刚刚启动的 Nimbus、Supervisor 信息。因为我们还没有提交任何任务,所以 Topology Summary 信息为空。Zookeeper Summary为我们配置的 ZK 信息。
3. 编写 DEMO
创建 Maven 工程,pom 文件入下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jstorm-hehe</groupId> <artifactId>hehe</artifactId> <version>1.0-SNAPSHOT</version> <name>hehe</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <profiles.src>src/main/java</profiles.src> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <resources.dir>src/main/resources</resources.dir> <profiles.dir>src/main/profiles</profiles.dir> <java-version>1.8</java-version> <org.slf4j.vresion>1.7.5</org.slf4j.vresion> <prod.dir>${profiles.dir}/prod</prod.dir> <test.skip>true</test.skip> <jstorm.version>2.1.1</jstorm.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-jdk14</artifactId> </exclusion> </exclusions> <scope>provided</scope><!--在提交任务是jstrom引擎包含了运行的所有依赖包,所以在打包时不将jstorm包引入--> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.7.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> |
工程目录如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
/hehe /src /main /java /com.hehe /bolt DemoBolt.java /spout DemoSpout.java Main.java /resources logback.xml pom.xml |
Main.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.hehe; import backtype.storm.Config; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import com.hehe.bolt.DemoBolt; import com.hehe.spout.DemoSpout; public class Main { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("demoSpout", new DemoSpout()).setNumTasks(1).setMaxTaskParallelism(1); topologyBuilder.setBolt("firstBolt", new DemoBolt()).setNumTasks(1).setMaxTaskParallelism(1) .localOrShuffleGrouping("demoSpout"); StormTopology topology = topologyBuilder.createTopology(); Config conf = new Config(); conf.setMaxSpoutPending(10); conf.setNumAckers(1); conf.setNumWorkers(1); conf.setDebug(true); conf.setMessageTimeoutSecs(5); conf.put(Config.STORM_CLUSTER_MODE, "distributed"); StormSubmitter.submitTopology("hehe", conf, topology); Utils.sleep(10); } } |
DemoSpout.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package com.hehe.spout; import java.text.SimpleDateFormat; import java.util.*; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DemoSpout extends BaseRichSpout { private SpoutOutputCollector collector; private final static Logger logger = LoggerFactory.getLogger(DemoSpout.class); public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; } public void nextTuple() { List<Object> nextDouble = new ArrayList<Object>(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); nextDouble.add(df.format(new Date())); logger.info("spout nextTuple {}", df.format(new Date())); collector.emit(nextDouble, MessageId.generateId(new Random())); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("demo")); } @Override public void ack(Object msgId) { super.ack(msgId); } @Override public void fail(Object msgId) { super.fail(msgId); } } |
DemoBolt.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
package com.hehe.bolt; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DemoBolt extends BaseRichBolt { private OutputCollector collector; private final static Logger logger = LoggerFactory.getLogger(DemoBolt.class); public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } public void execute(Tuple tuple) { logger.info("bolt execute {}", tuple.getValue(0)); collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } |
4. 提交任务
构建工程:
1 2 |
mvn clean package |
将构建好的 hehe-1.0-SNAPSHOT.jar 文件上传到 JStorm 节点。并提交任务:
1 2 |
jstorm jar hehe-1.0-SNAPSHOT.jar com.hehe.Main |
此时我们可以在 UI 中看到我们刚刚提交的任务:hehe。
进入 Topology 详情,我们可以查看当前任务的运行信息及拓扑图:
日志为我们在此任务中的输出:
5. 关闭任务
执行以下命令,将当前运行的任务关闭:
1 2 |
jstorm kill hehe |
本文作者:徐赫阳