简述

为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。

在 Storm 中有八种内置的数据流分组方式,而且还可以通过 CustomStreamGrouping接口实现自定义的数据流分组模型。(所以总共可以算是九种分组方式)

具体分组

这八种分组分时分别为:


  • Shuffle grouping:随机分组。这种方式下元组会被尽可能随机地分配到Bolt的不同任务(tasks)中,使得每个任务所处理元组数量能够保持基本一致,以确保集群的负载均衡。
  • Fields grouping:按字段分组。这种方式下数据流根据定义的Field来进行分组。比如,如果某个数据流是基于一个名为“user-id”的字段进行分组的,那么所有包含相同的“user-id”的tuple都会被分配到同一个任务中,这样就可以确保消息处理的一致性。
  • Partial Key grouping:部分关键字分组。这种方式与Fields grouping很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游Bolt数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能。
  • All grouping:完全分组。这种方式下数据流会被同时发送到Bolt的所有任务中(也就是说同一个元组会被复制多份然后被所有的任务处理),使用这种分组方式要特别小心。
  • Global grouping:全局分组。这种方式下所有的数据流都会被发送到 Bolt 的同一个任务中,也就是id最小的那个任务。
  • None grouping:无分组。使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组完全等效,不过未来Storm社区可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。
  • Direct grouping:直接分组。这是一种特殊的分组方式。使用这种方式意味着元组的发送者可以指定下游的哪个任务可以接收这个元组。只有在数据流被声明为直接数据流时才能够使用直接分组方式。使用直接数据流发送元组需要使用OutputCollector的其中一个emitDirect方法。Bolt可以通过TopologyContext来获取它的下游消费者的任务id,也可以通过跟踪OutputCollector的emit方法(该方法会返回它所发送元组的目标任务的id)的数据来获取任务 id。
  • Local or shuffle grouping:本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致。

其中Shuffle grouping、Fields grouping、All grouping、Global grouping四种策略用得较多。

实例

拿最简单的WordCount来做例子:

新建wordcount项目

新建RandomSentenceSpout类来产生数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.topo;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {

private static final long serialVersionUID = 6102239192526611945L;

private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);

private SpoutOutputCollector collector;
private Random random;

/**
* 当一个Task被初始化的时候会调用此open方法,
* 一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}

/**
* 这个spout类,之前说过,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中
* 那个task会负责去不断的无限循环调用nextTuple()方法
* 只要的话呢,无限循环调用,可以不断发射最新的数据出去,形成一个数据流
*/
public void nextTuple() {
String[] sentences = new String[]{
"I used to watch her from my kitchen widow"
, "she seemed so small as she muscled her way through the crowd of boys on the playground"
, "The school was across the street from our home and I would often watch the kids as they played during recess"
, "A sea of children, and yet tome"
, "she stood out from them all"};
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info(" --- 发射 sentence 数据 ---> {}", sentence);
// 这个values,你可以认为就是构建一个tuple,tuple是最小的数据单位,无限个tuple组成的流就是一个stream,通过 emit 发送数据到下游bolt tuple
this.collector.emit(new Values(sentence));
}

/**
* 用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游
bolt 中 execute 接收数据 key
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

新建SplitSentenceBolt类用来切割单词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.topo;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {

private static final long serialVersionUID = -4758047349803579486L;

private OutputCollector collector;

/**
* 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
* OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

/**
* 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
* 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
* 切分单词
*/
public void execute(Tuple input) {
// 接收上游数据
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
//发射数据
this.collector.emit(new Values(word));
}
}

/**
* 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

新建WordCountBolt类用来单词计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.topo;

import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class WordCountBolt extends BaseRichBolt {

private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);

private static final long serialVersionUID = -7114915627898482737L;

private OutputCollector collector;

Map<String,Long> countMap = Maps.newConcurrentMap();

/**
* 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
* OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

/**
* 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
* 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
* 统计单词
*/
public void execute(Tuple input) {
// 接收上游数据
String word = input.getStringByField("word");
Long count = countMap.get(word);
if(null == count){
count = 0L;
}
count ++;
countMap.put(word, count);
LOGGER.info(" --- 单词计数[{}] ---> 出现的次数:{}", word, count);
//发射数据
this.collector.emit(new Values(word,count));
}

/**
* 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}

新建WordCountTopology类用来链接Spout和Bolt,执行主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.topo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class WordCountTopology {

public static void main(String[] args) {
//去将spout和bolts组合起来,构建成一个拓扑
TopologyBuilder builder = new TopologyBuilder();

// 第一个参数的意思,就是给这个spout设置一个名字
// 第二个参数的意思,就是创建一个spout的对象
// 第三个参数的意思,就是设置spout的executor有几个
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
//为bolt 设置 几个task
.setNumTasks(10)
//设置流分组策略
.shuffleGrouping("RandomSentence");

// fieldsGrouping 这个很重要,就是说,相同的单词,从SplitSentenceSpout发射出来时,一定会进入到下游的指定的同一个task中
// 只有这样子,才能准确的统计出每个单词的数量
// 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello
// 通过fieldsGrouping 可以将 5个hello,全都进入一个task
builder.setBolt("wordCount", new WordCountBolt(), 10)
//为bolt 设置 几个task
.setNumTasks(20)
//设置流分组策略
.shuffleGrouping("SplitSentence");
//.globalGrouping("SplitSentence");
//.allGrouping("SplitSentence");
//.fieldsGrouping("SplitSentence", new Fields("word"));

// 运行配置项
Config config = new Config();

//说明是在命令行执行,打算提交到storm集群上去
if(args != null && args.length > 0){
/**
* 要想提高storm的并行度可以从三个方面来改造
* worker(进程)>executor(线程)>task(实例)
* 增加work进程,增加executor线程,增加task实例
* 对应 supervisor.slots.port 中配置个数
* 这里可以动态设置使用个数
* 最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输
*
* 注意:如果worker使用完的话再提交topology就不会执行,因为没有可用的worker,只能处于等待状态,把之前运行的topology停止一个之后这个就会继续执行了
*/
config.setNumWorkers(3);
try {
// 将Topolog提交集群
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else{
// 用本地模式运行1个拓扑时,用来限制生成的线程的数量
config.setMaxTaskParallelism(20);

// 将Topolog提交本地集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCountTopology", config, builder.createTopology());

// 为了测试模拟等待
Utils.sleep(60000);
// 执行完毕,关闭cluster
cluster.shutdown();
}
}

}

运行结果

shuffleGrouping运行结果

shuffleGrouping运行结果
shuffleGrouping运行结果

随机分组,不自觉间做到了负载均衡。

globalGrouping运行结果

globalGrouping运行结果
globalGrouping运行结果

只往一个里面发,发送到id最小的那个任务。

allGrouping运行结果

allGrouping运行结果
allGrouping运行结果

两个spot并行 所有都分发。

fieldsGrouping运行结果

fieldsGrouping运行结果
fieldsGrouping运行结果

相同的名称的fields分发到一个bolt里面。