您现在的位置是: 首页 > Storm简单变成之WordCount实现


Storm简单变成之WordCount实现

纯真年代 2018-11-13 10:06:47 0 406人围观

我的码云直达车

官网案例直达车

1)编写Spout,让它产生数据

/**
* 数据源产生的Spout
*/
public static class RandomWordSpout extends BaseRichSpout {

SpoutOutputCollector collector;
Random random;

/**
* 初始化
*
* @param conf 配置参数
* @param context 上下文
* @param collector 数据发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}

/**
* 此方法是个死循环
*/
@Override
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[]{"hadoop", "spark", "kafka", "flume", "storm", "hbase", "hive", "zookeeper"};
String word = words[random.nextInt(words.length)];

//将数据发送到下游
this.collector.emit(new Values(word, 1));
}

/**
* 申明输出字段
*
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

2)Bolt处理数据,业务逻辑

/**
* 接受数据并处理
*/
public static class WordCountBolt extends BaseRichBolt {

OutputCollector collector;
Map<String, Integer> countsMap;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
countsMap = new HashMap<>();
}

@Override
public void execute(Tuple tuple) {

/**
* 这里通过上面定义的Fields进行获取
* 也可以通过下标获取:tuple.getInteger(0)
*/
String word = tuple.getStringByField("word");
Integer count = countsMap.get(word);
if (count == null) {
count = 0;
}
count++;

//单词数汇总
countsMap.put(word, count);

//输出
System.out.println("~~~~~~~~~~~~~~~~~~~~~~");
Set<Map.Entry<String, Integer>> entrySet = countsMap.entrySet();
for (Map.Entry<String, Integer> entry : entrySet) {
System.out.println(entry);
}


}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

3)构建topo,这里在本地测试

/**
* 构建本地Topology
*
* @param args
* @url http://storm.apache.org/releases/1.2.2/Local-mode.html
* http://storm.apache.org/releases/1.2.2/Running-topologies-on-a-production-cluster.html
*/
public static void main(String[] args) {

/**
* 定义拓扑,在生产群集上运行拓扑与在本地模式下运行类似
* Topology需要指定相关的Spout和Bolt的执行顺序
* shuffleGrouping("dataSourceSpout"):Tuples以一种随机分布方式在Bolt的任务中,每个Bolt都保证获得相同数量的Tuples
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("RandomWordSpout", new RandomWordSpout());
builder.setBolt("WordCountBolt", new WordCountBolt()).shuffleGrouping("RandomWordSpout");

StormTopology topology = builder.createTopology();
//本地模式模拟正在进行的Storm集群,只需使用LocalCluster类
LocalCluster cluster = new LocalCluster();
/**
* 然后,您可以使用submitTopology该LocalCluster对象上的方法提交拓扑。就像StormSubmitter上的相应方法一样,
* submitTopology采用名称,拓扑配置和拓扑对象。然后,您可以使用killTopology将拓扑名称作为参数的方法来终止拓扑。
*/
cluster.submitTopology("LocalDoubleAndTripleBoltApp", new Config(), topology);


//要关闭本地群集,请简单调用
//cluster.shutdown();
}

4)运行如下




全部评论