Stream  –  数据流
    从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
    Stream声明时需给其指定一个Id(默认为Default)
    实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
Tuple  –  元组
    Stream中最小数据组成单元

Topology  –  DAG有向无环图的实现
    对于Storm实时计算逻辑的封装
            由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
    生命周期
            此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止
            (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
Spout  –  数据源  
    拓扑中数据流的来源。
            一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
    一个Spout可以发送 多个   数据流(Stream)
            可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
    Spout中最核心的方法是nextTuple。
            该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算
Bol –  数据流处理组件
    拓扑中数据处理均由Bolt完成。
            对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
    一个Bolt可以发送多个数据流(Stream)
            可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
    Bolt中最核心的方法是execute方法。
            该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

Stream Grouping  –  数据流分组
     数据分发策略
              每一个blot可以有多个实例,这是就要指定数据从 spout或bolt  传递到 下一个blot的哪一个实例 ,指定其中传递的规则
    1. Shuffle Grouping 
            随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
            轮询,平均分配 
    2. Fields Grouping
            按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。 
    3. All Grouping
            广播发送,对于每一个tuple,所有的bolts都会收到 
    4. Global Grouping
            全局分组,把tuple分配给task id最低的task 。
    5. None Grouping
            不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。 
    6. Direct Grouping
            指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为             Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)  
    7. Local or shuffle grouping
            本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致
    8. customGrouping
            自定义,相当于mapreduce那里自己去实现一个partition一样。



例子
1、数据累加
    Topology
package accu;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class TopologyDemo {

    public static void main(String[] args) throws InterruptedException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new SpoutDemo());
        BoltDeclarer bolt = builder.setBolt("bolt", new BoltDemo());
        // 指定接受哪个spout数据 , 及spout与bolt的连接方式
        bolt.shuffleGrouping("spout");

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("accu num", config, builder.createTopology());

        Thread.sleep(10000);
        cluster.shutdown();
    }

}

    spout

package accu;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.List;
import java.util.Map;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class SpoutDemo extends BaseRichSpout{

    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        List tuple = new Values(1);
        collector.emit(tuple);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 定义 输出的 tuple 每一位的 值的名字
        // ≈ 列名
        declarer.declare(new Fields("num"));
        // declarer.declare(new Fields("word", "num"));
    }
}

    bolt

package accu;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.List;
import java.util.Map;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class BoltDemo extends BaseRichBolt{

    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    int sum = 0;

    @Override
    public void execute(Tuple input) {
        int i = input.getInteger(0);
        sum += i;
        List tuple = new Values(sum);
        System.out.println("num:" + i + "; sum:" + sum);
        collector.emit(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sum"));
    }
}

2、WordCount

    Topology

package wc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class WcTopology {

    public static void main(String[] args) throws InterruptedException {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new WcSpout());
        builder.setBolt("bolt1", new WcBolt_One()).shuffleGrouping("spout");
        builder.setBolt("bolt2", new WcBolt_Two()).fieldsGrouping("bolt1", new Fields("word"));

        Config config = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("wordcount", config, builder.createTopology());

        Thread.sleep(10000);
        cluster.shutdown();

    }

}

    spout  

package wc;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class WcSpout extends BaseRichSpout{

    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    private List<String> list = new ArrayList<String>();

    @Override
    public void nextTuple() {
        list.add("hadoop hive hive zookeeper hbase hive kafka hadoop devin jeffery");
        list.add("hadoop hive hive zookeeper hadoop devin jeffery");
        list.add("hive kafka hadoop devin jeffery");

        for (String s : list) {
            collector.emit(new Values(s));
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}

    bolt1

package wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class WcBolt_One extends BaseRichBolt{

    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String[] arr = input.getString(0).split(" ");
        for (String s : arr) {
            collector.emit(new Values(s, 1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "num"));
    }
}

    bolt2

package wc;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Jeffrey.Deng on 2017/9/6.
 */
public class WcBolt_Two extends BaseRichBolt {

    private Map conf;
    private TopologyContext context;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.conf = conf;
        this.context = context;
        this.collector = collector;
    }

    private Map<String, Integer> map = new HashMap<>();

    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);
        int num = input.getInteger(1);
        int count = 1;
        if (map.containsKey(word)) {
            count =  map.get(word) + num;
        }
        map.put(word, count);
        System.out.println("word:" + word + "; count:" + count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}


添加新评论