
storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。
Storm has two modes of operation: local mode and distributed mode.
In local mode, Storm executes completely in process by simulating
worker nodes with threads. Local mode is useful for testing and
development of topologies
因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。
二、实施步骤
如何基于eclipse+maven调试storm程序,步骤如下:
1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)
2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)
Github上的pom.xml,引入的依赖太多,有些不需要,
3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount
重要的是LocalCluster cluster = new LocalCluster()这一句
Config conf = new Config()
conf.setDebug(true)
conf.setNumWorkers(2)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("test", conf, builder.createTopology())
Utils.sleep(10000)
cluster.killTopology("test")
cluster.shutdown()
pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.starter</groupId>
<artifactId>storm-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project>
storm程序
package storm.starter
import java.util.HashMap
import java.util.Map
import storm.starter.spout.RandomSentenceSpout
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.StormSubmitter
import backtype.storm.topology.BasicOutputCollector
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.TopologyBuilder
import backtype.storm.topology.base.BaseBasicBolt
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Tuple
import backtype.storm.tuple.Values
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0)
System.out.println(msg + "-------------------")
if (msg != null) {
String[] s = msg.split(" ")
for (String string : s) {
collector.emit(new Values(string))
}
}
} catch (Exception e) {
e.printStackTrace()
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer>counts = new HashMap<String, Integer>()
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0)
Integer count = counts.get(word)
if (count == null)
count = 0
count++
counts.put(word, count)
collector.emit(new Values(word, count))
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"))
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder()
builder.setSpout("spout", new RandomSentenceSpout(), 5)
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(
"spout")
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",
new Fields("word"))
Config conf = new Config()
conf.setDebug(true)
if (args != null &&args.length >0) {
conf.setNumWorkers(3)
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology())
} else {
conf.setMaxTaskParallelism(3)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("word-count", conf, builder.createTopology())
Thread.sleep(10000)
cluster.shutdown()
}
}
}
package storm.starter.spout
import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Values
import backtype.storm.utils.Utils
import java.util.Map
import java.util.Random
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector
Random _rand
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector
_rand = new Random()
}
@Override
public void nextTuple() {
Utils.sleep(100)
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }
String sentence = sentences[_rand.nextInt(sentences.length)]
_collector.emit(new Values(sentence))
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
安装Storm软件分五步: 安装Zookeeper。 安装Storm的依赖环境:Java和Python。 下载并解压Storm安装包。 修改必要的Storm配置文件。 启动Storm程序。欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)