这2周没有按马哥安排的课程走,因公司需要,大家一直在试尝大数据这块。作业不能不做,也不知道马哥哪周的作业会有storm,只好先将这段时间的实验慢慢记录下来(其它flume、kafka、spark等本周会慢慢补充),等知道具体的作业题目后,再完善。
实验目的
了解storm的原理,并用storm单机版实验加深理解,为后面的大数据做准备。
了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。
本篇不涉及原理及相关解释,可以度娘。
实验题目
RandomSpout类:读取外部数据并封装为tuple发送出去,模拟从goods数据中随机取一个商品名称封装到tuple中发送出去;
UpperBolt类:将收到的原始商品名称,转换成大写再发送出去;
SuffixBolt类:给商品名称添加后缀,然后将数据写入文件中;
TopoMain类:描述topology的结构,以及创建topology并提交给集群;
RandomSpout.java
// 读取外部数据并封装为tuple发送出去 public class RandomSpout extends BaseRichSpout{ SpoutOutputCollector collector = null; String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"}; /** * 获取消息并发送给下一个组件的方法,会被storm不断地调用(最重要的一个方法) * * 从goods数据中随机取一个商品名称封装到tuple中发送出去 * */ @Override public void nextTuple() { // 随机取到一个商品名称 Random random = new Random(); String good = goods[random.nextInt(goods.length)]; //封装到tuple中发送出去 collector.emit(new Values(good)); //休眠500毫秒 Utils.sleep(500); } //进行初始化,只在开始的时候调用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 定义tuple的scheme * */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("src_word")); //第一个商品名称 } }
UpperBolt.java
/** * 将收到的原始商品名称,转换成大写再发送出去 * */ public class UpperBolt extends BaseBasicBolt{ /** * execute:每来一次消息,就会被执行一次 * */ @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 从tuple中拿到我们的数据----原始商品名 String src_word = tuple.getString(0); //转换成大写 String upper_word = src_word.toUpperCase(); //发送出去 collector.emit(new Values(upper_word)); } //声明bolt组件要发送tuple的字段定义 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("upper_word")); } }
SuffixBolt.java
/** * 给商品名称添加后缀,然后将数据写入文件中 * @author zhouyong * */ public class SuffixBolt extends BaseBasicBolt { FileWriter fileWriter = null; //初始化方法,会被调用一次 @Override public void prepare(Map stormConf, TopologyContext context) { try{ fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID()); }catch(Exception ex){ ex.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 从消息元组tuple中拿到上一个组件发送过来的数据 String upper_word = tuple.getString(0); //给商品名称添加后缀 String result = upper_word + "_suffix"; try{ fileWriter.append(result); fileWriter.append("\n"); fileWriter.flush(); }catch(Exception ex){ ex.printStackTrace(); } } //声明该组件要发送出去的tuple的字段定义 @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
主类TopoMain.java
/** * 描述topology的结构,以及创建topology并提交给集群 * @author zhouyong * */ public class TopoMain { public static void main(String [] args) throws Exception{ TopologyBuilder topologyBuilder = new TopologyBuilder(); //设置消息源组件为RandomSpout //唯为标识,spout实例,并发数 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4); //设置逻辑处理组件UpperBolt,并指定接收randomspout的消息 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout"); //设置逻辑处理组件SuffixBolt,并指定接收upperbolt的消息 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt"); //创建一个topology StormTopology topo = topologyBuilder.createTopology(); //创建一个storm的配置参数对象 Config conf = new Config(); //设置storm集群为这个topo启动的进程数 conf.setNumWorkers(4); conf.setDebug(true); conf.setNumAckers(0); //提交topo到storm集群中 StormSubmitter.submitTopology("demotopo", conf, topo); } }
将这4个java类打包成jar包,jar包名称为demotopo.jar。
环境部署
1,安装zookeeper;
2,安装storm;
CentOS6.5,我们统一将zookeeper和storm安装到/opt/hadoop/下。
安装zookeeper
zookeeper版本:zookeeper-3.4.8.tar.gz
单机版部署zookeeper,只要解压就可以了,可以不做配置上的修改。
启动zookeeper
1>, 启动zookeeper
# ./zkServer.sh start
2>, 检查zookeeper是否成功
# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg Mode: standalone
安装storm
1>,版本:apache-storm-0.9.6.tar.gz
2>,安装配置
解压到/opt/hadoop/storm/后,配置storm.yaml,
文件在/usr/local/storm/conf/storm.yaml,内容: storm.zookeeper.servers: - 127.0.0.1 storm.zookeeper.port: 2181 nimbus.host: "127.0.0.1" storm.local.dir: "/tmp/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
注意: 这里要特别注意前后空格问题,否则在启动时会不通过。
3>, 启动主节点:nimbus
# ./storm nimbus\\前台启动,窗口不能关闭 ## bin/storm nimbus 1>/dev/null 2>&1 &\\后台启动
检查是否启动 # jps 10790 nimbus 11030 worker 10870 supervisor 8457 QuorumPeerMain 11366 Jps 11023 worker 11255 core 11018 worker 11019 worker
前台窗口启动主节点nimbus截图:
4>,启动一个前端UI
# bin/storm ui\\前端启动,窗口不能关 ## bin/storm ui 1>/dev/null 2>&1 &\\后台启动 # jps 11255 core
http://172.31.3.148:8080/index.html
5>,启动从节点: supervisor
# ./storm supervisor\\前端启动 ## ./storm supervisor\\后台启动 # jps 10790 nimbus 11030 worker 10870 supervisor
用jps检查下所有服务是否都正常:
[root@localhost apache-storm-0.9.6]# jps 10790 nimbus 11030 worker 10870 supervisor 8457 QuorumPeerMain 11366 Jps 11023 worker 11255 core 11018 worker 11019 worker
——-
提交Topologies
1>,上传jar包
将demotopo.jar包上传到storm的安装目录/opt/hadoop/storm/下:
2>,将jar包发送给storm去执行
命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】【stormIP地址】【storm端口】【拓扑名称】【参数】
# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain 654 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 708 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar 752 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar 752 [main] INFO backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true} 973 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: demotopo
3>,检查
检查Topology(名为demotopo)是否运行
检查/home/hadoop/下是否生成了4个文件
这里的4个worker是在配置文件中配置的。
检查文件大小是否在不停变化
检查文件中单词是否随机,每个单词后是否以suffix结尾
注:附件是实验jar包,请将.rar改为.jar。
原创文章,作者:365,如若转载,请注明出处:http://www.178linux.com/46231