N21_第x周_Storm_01_单机实践篇

   这2周没有按马哥安排的课程走,因公司需要,大家一直在试尝大数据这块。作业不能不做,也不知道马哥哪周的作业会有storm,只好先将这段时间的实验慢慢记录下来(其它flume、kafka、spark等本周会慢慢补充),等知道具体的作业题目后,再完善。

实验目的

  了解storm的原理,并用storm单机版实验加深理解,为后面的大数据做准备。

  了解Topology、spout、bolt、Nimbus、Suppervisor,怎么用。

  本篇不涉及原理及相关解释,可以度娘。   

实验题目

  1473770496158165.jpg

  1473770824893767.jpg

    RandomSpout类:读取外部数据并封装为tuple发送出去,模拟从goods数据中随机取一个商品名称封装到tuple中发送出去;

    UpperBolt类:将收到的原始商品名称,转换成大写再发送出去;

    SuffixBolt类:给商品名称添加后缀,然后将数据写入文件中;

    TopoMain类:描述topology的结构,以及创建topology并提交给集群;

RandomSpout.java

// 读取外部数据并封装为tuple发送出去
public class RandomSpout extends BaseRichSpout{
SpoutOutputCollector collector = null;
String [] goods = {"iphone","xiaomi","meizu","zhongxing","huawei","moto","sumsung","simens"};
/**
 *  获取消息并发送给下一个组件的方法,会被storm不断地调用(最重要的一个方法)
 * 
 *  从goods数据中随机取一个商品名称封装到tuple中发送出去
 * */
@Override
public void nextTuple() {
// 随机取到一个商品名称
Random random = new Random();
String good = goods[random.nextInt(goods.length)];
//封装到tuple中发送出去
collector.emit(new Values(good));
//休眠500毫秒
Utils.sleep(500);
}
//进行初始化,只在开始的时候调用一次
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
 *  定义tuple的scheme
 * */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("src_word"));   //第一个商品名称
}
}

UpperBolt.java

/**
 *  将收到的原始商品名称,转换成大写再发送出去
 * */
public class UpperBolt extends BaseBasicBolt{
/**
 *  execute:每来一次消息,就会被执行一次
 * */
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从tuple中拿到我们的数据----原始商品名
String src_word = tuple.getString(0);
//转换成大写
String upper_word = src_word.toUpperCase();
//发送出去
collector.emit(new Values(upper_word));
}
//声明bolt组件要发送tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("upper_word"));
}
}

SuffixBolt.java

/**
 *  给商品名称添加后缀,然后将数据写入文件中
 *  @author zhouyong
 * */
public class SuffixBolt extends BaseBasicBolt {
FileWriter fileWriter = null;
//初始化方法,会被调用一次
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
fileWriter = new FileWriter("/home/hadoop/" + UUID.randomUUID());
}catch(Exception ex){
ex.printStackTrace();
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// 从消息元组tuple中拿到上一个组件发送过来的数据
String upper_word = tuple.getString(0);
//给商品名称添加后缀
String result = upper_word + "_suffix";
try{
fileWriter.append(result);
fileWriter.append("\n");
fileWriter.flush();
}catch(Exception ex){
ex.printStackTrace();
}
}
//声明该组件要发送出去的tuple的字段定义
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
}

主类TopoMain.java

/**
 *  描述topology的结构,以及创建topology并提交给集群
 *  @author zhouyong
 * */
public class TopoMain {
public static void main(String [] args) throws Exception{
 TopologyBuilder topologyBuilder = new TopologyBuilder();
 
 //设置消息源组件为RandomSpout
 //唯为标识,spout实例,并发数
 topologyBuilder.setSpout("randomspout", new RandomSpout(), 4);
 
 //设置逻辑处理组件UpperBolt,并指定接收randomspout的消息
 topologyBuilder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
 
 //设置逻辑处理组件SuffixBolt,并指定接收upperbolt的消息
 topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
 
 //创建一个topology
 StormTopology topo = topologyBuilder.createTopology();
 
 //创建一个storm的配置参数对象
 Config conf = new Config();
 //设置storm集群为这个topo启动的进程数
 conf.setNumWorkers(4);
 conf.setDebug(true);
 conf.setNumAckers(0);
 
 //提交topo到storm集群中
 StormSubmitter.submitTopology("demotopo", conf, topo);
 
}
}

将这4个java类打包成jar包,jar包名称为demotopo.jar。

1473771658505019.jpg

环境部署

1,安装zookeeper;

2,安装storm;

CentOS6.5,我们统一将zookeeper和storm安装到/opt/hadoop/下。

安装zookeeper

zookeeper版本:zookeeper-3.4.8.tar.gz

单机版部署zookeeper,只要解压就可以了,可以不做配置上的修改。

启动zookeeper

1>, 启动zookeeper

# ./zkServer.sh start

2>, 检查zookeeper是否成功

# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config:
/opt/hadoop/zookeeper/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

 1473772366993930.jpg

安装storm

1>,版本:apache-storm-0.9.6.tar.gz

2>,安装配置

   解压到/opt/hadoop/storm/后,配置storm.yaml,

文件在/usr/local/storm/conf/storm.yaml,内容:
 storm.zookeeper.servers:
     - 127.0.0.1
 storm.zookeeper.port: 2181
 nimbus.host: "127.0.0.1"
 storm.local.dir: "/tmp/storm"
 supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

注意: 这里要特别注意前后空格问题,否则在启动时会不通过。


3>, 启动主节点:nimbus

# ./storm nimbus\\前台启动,窗口不能关闭
## bin/storm nimbus 1>/dev/null 2>&1 &\\后台启动
检查是否启动
# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

前台窗口启动主节点nimbus截图:

1473772984288521.jpg


4>,启动一个前端UI

# bin/storm ui\\前端启动,窗口不能关
## bin/storm ui 1>/dev/null 2>&1 &\\后台启动
# jps
11255 core

http://172.31.3.148:8080/index.html  

       1473773190267950.jpg


5>,启动从节点: supervisor

# ./storm supervisor\\前端启动
## ./storm supervisor\\后台启动
# jps
10790 nimbus
11030 worker
10870 supervisor

1473773344524546.jpg

用jps检查下所有服务是否都正常:

[root@localhost apache-storm-0.9.6]# jps
10790 nimbus
11030 worker
10870 supervisor
8457 QuorumPeerMain
11366 Jps
11023 worker
11255 core
11018 worker
11019 worker

——-

提交Topologies

1>,上传jar包

将demotopo.jar包上传到storm的安装目录/opt/hadoop/storm/下:

1473773781461054.jpg

2>,将jar包发送给storm去执行

命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】【stormIP地址】【storm端口】【拓扑名称】【参数】

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain

# ./storm jar ../../demotopo.jar cn.itcast.storm.TopoMain
Running: /usr/local/java/bin/java -client -Dstorm.options= -Dstorm.home=/opt/hadoop/storm/apache-storm-0.9.6 -Dstorm.log.dir=/opt/hadoop/storm/apache-storm-0.9.6/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/hadoop/storm/apache-storm-0.9.6/lib/carbonite-1.4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-util-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/compojure-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jgrapht-core-0.9.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.logging-0.2.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jetty-6.1.26.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-time-0.4.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-codec-1.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/servlet-api-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/storm-core-0.9.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/hiccup-0.3.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clj-stacktrace-0.2.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-lang-2.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/chill-java-0.3.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/kryo-2.21.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-classic-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/minlog-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/objenesis-1.2.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clout-1.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.cli-0.2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-devel-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-jetty-adapter-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-servlet-0.3.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/snakeyaml-1.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/log4j-over-slf4j-1.6.6.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/core.incubator-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/ring-core-1.1.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/asm-4.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/math.numeric-tower-0.0.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/disruptor-2.10.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/joda-time-2.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-exec-1.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/jline-2.11.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/slf4j-api-1.7.5.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/tools.macro-0.1.0.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/clojure-1.5.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-fileupload-1.2.1.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/reflectasm-1.07-shaded.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/logback-core-1.0.13.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-io-2.4.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/commons-logging-1.1.3.jar:/opt/hadoop/storm/apache-storm-0.9.6/lib/json-simple-1.1.jar:../../demotopo.jar:/opt/hadoop/storm/apache-storm-0.9.6/conf:/opt/hadoop/storm/apache-storm-0.9.6/bin -Dstorm.jar=../../demotopo.jar cn.itcast.storm.TopoMain
654  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
708  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar ../../demotopo.jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/hadoop/storm/apache-storm-0.9.6/nimbus/inbox/stormjar-c392c9d8-ebaf-47e8-be98-6d98526e82a8.jar
752  [main] INFO  backtype.storm.StormSubmitter - Submitting topology demotopo in distributed mode with conf {"topology.workers":4,"topology.acker.executors":0,"topology.debug":true}
973  [main] INFO  backtype.storm.StormSubmitter - Finished submitting topology: demotopo

3>,检查

检查Topology(名为demotopo)是否运行

      1473774107376858.jpg

检查/home/hadoop/下是否生成了4个文件

  1473774207108853.jpg

  这里的4个worker是在配置文件中配置的。

检查文件大小是否在不停变化

   1473774307357070.jpg

检查文件中单词是否随机,每个单词后是否以suffix结尾

  1473774437982428.jpg

注:附件是实验jar包,请将.rar改为.jar。

N21_第x周_Storm_01_单机实践篇demotopo.rar

原创文章,作者:365,如若转载,请注明出处:http://www.178linux.com/46231

(0)
365365
上一篇 2016-09-15
下一篇 2016-09-15

相关推荐

  • 任务计划管理

    一:单一工作调度:at命令       列出在指定的时间和日期在计算机上运行的已计划命令或计划命令和程序。必须正在运行“计划”服务才能使用 at 命令。 示例: [root@CentOS 6 ~]#/etc/init.d/atd restart   启动服务 …

    Linux干货 2016-09-12
  • iptables简单概念..

    iptables: 包过滤型的防火墙 Firewall:防火墙,隔离工具;工作于主机或网络边缘,对于进出本主机或本网络的报文根据事先定义的检查规则作匹配检测,对于能够被规则匹配到的报文作出相应处理的组件;    主机防火墙     网络防火墙   &…

    Linux干货 2017-06-19
  • 11.网络解析和网络加密

    1、详细描述一次加密通讯的过程,结合图示最佳。 加密过程 1.使用单向加密算法,提取A的文件的特征码。 2.使用A的私钥对提取出来的特征码进行加密,把加密后的特征码附加在A的文件的后面。 3.使用对称加密对刚刚的A的文件和加密后的特征码进行加密,生成对称加密密钥 4.使用B的公钥对第3步骤的对称加密的密钥进行加密,加密后附加在文件的后面。 解密过程 1.使用…

    2017-09-20
  • 架构师第一天之:Nginx

    nginx: 诞生背景: prefork机制不能支持过大的并发请求, C10K问题的解决 官方站点: http://nginx.org 二次开发版: tengine,openresty 特性: 模块化设计,较好的拓展性 高可靠性:master/worker架构 支持热部署:不停机更新配置文件,更换日至文件,更新服务器版本 低内存消耗:10000个keep-a…

    Linux干货 2016-10-29
  • 第六周作业-Vim总结

                    Vim总结 vim功能比较多,这里我不总结vim多窗口,只对基本常用的介绍. 一.模式说明 vim和记事本或WORD不一样,不是一打开后就可以输入文字,此时它处于正常模…

    Linux干货 2017-01-16
  • Hello word!

    初识linux

    2018-03-26