
一、介绍
storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。
Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies
因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。
二、实施步骤
如何基于eclipse+maven调试storm程序,步骤如下:
1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)
2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)
Github上的pom.xml,引入的依赖太多,有些不需要,
3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount
重要的是LocalCluster cluster = new LocalCluster()这一句
Config conf = new Config()
conf.setDebug(true)
conf.setNumWorkers(2)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("test", conf, builder.createTopology())
Utils.sleep(10000)
cluster.killTopology("test")
cluster.shutdown()
pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm.starter</groupId>
<artifactId>storm-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.9.0.1</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project> storm程序
package storm.starter
import java.util.HashMap
import java.util.Map
import storm.starter.spout.RandomSentenceSpout
import backtype.storm.Config
import backtype.storm.LocalCluster
import backtype.storm.StormSubmitter
import backtype.storm.topology.BasicOutputCollector
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.TopologyBuilder
import backtype.storm.topology.base.BaseBasicBolt
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Tuple
import backtype.storm.tuple.Values
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0)
System.out.println(msg + "-------------------")
if (msg != null) {
String[] s = msg.split(" ")
for (String string : s) {
collector.emit(new Values(string))
}
}
} catch (Exception e) {
e.printStackTrace()
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>()
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0)
Integer count = counts.get(word)
if (count == null)
count = 0
count++
counts.put(word, count)
collector.emit(new Values(word, count))
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"))
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder()
builder.setSpout("spout", new RandomSentenceSpout(), 5)
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(
"spout")
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",
new Fields("word"))
Config conf = new Config()
conf.setDebug(true)
if (args != null && args.length > 0) {
conf.setNumWorkers(3)
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology())
} else {
conf.setMaxTaskParallelism(3)
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("word-count", conf, builder.createTopology())
Thread.sleep(10000)
cluster.shutdown()
}
}
}
package storm.starter.spout
import backtype.storm.spout.SpoutOutputCollector
import backtype.storm.task.TopologyContext
import backtype.storm.topology.OutputFieldsDeclarer
import backtype.storm.topology.base.BaseRichSpout
import backtype.storm.tuple.Fields
import backtype.storm.tuple.Values
import backtype.storm.utils.Utils
import java.util.Map
import java.util.Random
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector
Random _rand
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector
_rand = new Random()
}
@Override
public void nextTuple() {
Utils.sleep(100)
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }
String sentence = sentences[_rand.nextInt(sentences.length)]
_collector.emit(new Values(sentence))
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"))
}
}
最近公司有个需求,需要在后端应用服务器上实时获取STORM集群的运行信息和topology相关的提交和控制,经过几天对STORM UI和CMD源码的分析,得出可以通过其thrift接口调用实现这些功能。先下载一个thrift库进行编码和安装。关于thrift可以参见这个地方。安装完成后,从STORM源码中将storm.thrift拷贝到thrift目录下。输入:hrift -gen cpp storm.thrift
会得到一个gen-cpp目录,里面就是thrift先关脚本的C++实现。我们先看storm.thrift文件接口:
view sourceprint?
01.service Nimbus
02.{
03.//TOPOLOGY上传接口
04.void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,4: StormTopology topology)
05.void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options)
06.void killTopology(1: string name)
07.void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e)
08.void activate(1: string name)
09.void deactivate(1: string name)
10.void rebalance(1: string name, 2: RebalanceOptions options)
11.
12.//TOPOLOGY JAR包上传接口
13.string beginFileUpload()
14.void uploadChunk(1: string location, 2: binary chunk)
15.void finishFileUpload(1: string location)
16.string beginFileDownload(1: string file)
17.binary downloadChunk(1: string id)
18.
19.//获取NIMBUS的配置信息
20.string getNimbusConf()
21.//获取STORM集群运行信息
22.ClusterSummary getClusterInfo()
23.//获取TOPOLOGY的运行状态信息
24.TopologyInfo getTopologyInfo(1: string id)
25.//获取TOPOLOGY对象信息
26.string getTopologyConf(1: string id)
27.StormTopology getTopology(1: string id)
28.StormTopology getUserTopology(1: string id)
29.}
生成C++文件后,我们就可以对其接口进行调用,由于thrift c++框架是使用boost库实现的,必须安装boost库依赖。实现的代码如下:
view sourceprint?
01.#define HAVE_NETDB_H //使用网络模块的宏必须打开
02.#include "Nimbus.h"
03.#include "storm_types.h"
04.
05.#include <string>
06.#include <iostream>
07.#include <set>
08.
09.#include <transport/TSocket.h>
10.#include <transport/TBufferTransports.h>
11.#include <protocol/TBinaryProtocol.h>
12.int test_storm_thrift()
13.{
14.boost::shared_ptr<TSocket>tsocket(new TSocket("storm-nimbus-server", 6627))
15.boost::shared_ptr<TTransport>ttransport(new TFramedTransport(tsocket, 1024 * 512))//此处必须使用TFramedTransport
16.boost::shared_ptr<TProtocol>tprotocol(new TBinaryProtocol(ttransport))
17.try{
18.//创建一个nimbus客户端对象
19.NimbusClient client(tprotocol)
20.//打开通道
21.ttransport->open()
22.
23.ClusterSummary summ
24.std::string conf
25.//对STORM的RPC调用,直接获取信息,同步进行的。
26.client.getNimbusConf(conf)
27.client.getClusterInfo(summ)
28.//关闭通道
29.ttransport->close()
30.}catch(TException &tx){
31.printf("InvalidOperation: %s
32.", tx.what())
33.}
34.}
以上代码就可以直接获取nimbus的配置和集群信息,其他接口以此类推。值得注意的是storm.thrift to C++生成的storm_types.h文件里其中operator <函数都未实现,所以必须手动进行添加实现,否则编译会有问题。
不仅仅C++可以实现STORM的控制,PHP和其他的语言也可以实现,只要thrift支持就OK。有兴趣可以实现一下试试看。
转载
流式计算中,各个中间件产品对计算过程中的角色的抽象都不尽相同,实现方式也是千差万别。本文针对storm中间件在进行流式计算中的几个概念做个概括总结。storm分布式计算结构称为topology(拓扑)由stream,spout,bolt组成。
spout代表一个storm拓扑中的数据入口,连接到数据源,将数据转化为一个个tuple,并发射tuple
stream是由无限制个tuple组成的序列。tuple为storm的核心数据结构,是包含了一个或多个键值对的列表。
bolt可以理解为计算程序中的运算或者函数,bolt的上游是输入流,经过bolt实施运算后,可输出一个或者多个输出流。
bolt可以订阅多个由spout或者其他bolt发射的数据流,用以构建复杂的数据流转换网络。
上述即为storm最基本的组成元素,无论storm如何运行,都是以stream,spout,bolt做为最基本的运行单元。而这三者则是共同构成了一个storm拓扑topology。
首先需要明确一个概念,bolt,spout实例,都属于任务,spout产生数据流,并发射,bolt消费数据流,进行计算,并进行落地或再发射,他们的存在以及运行过程都需要消耗资源,而storm集群是一个提供了资源的集群,我们要做的就是将spout/boult实例合理分配到storm集群提供的计算资源上,这样就可以让spout/bolt得以执行。
worker为JVM进程,一个topology会分配到一个或者多个worker上运行。
executor是worker内的java线程,是具体执行bolt/spout实例用的。下篇文章在介绍如何提供storm并行计算能力时会介绍worker以及executor的配置。
在storm中,worker是由supervisor进程创建,并进行监控的。storm集群遵循主从模式,主为nimbus,从为supervisor,storm集群由一个主节点(确实有单点问题),和多个工作节点(supervisor)组成,并使用zookeeper来协调集群中的状态信息,比如任务分配情况,worker状态,supervisor的拓扑度量。
通过配置可指定supervisor上可运行多少worker。一个worker代表一个slot。
nimbus守护进程的主要职责是管理,协调和监控在集群上运行的topology.包括topology的发布,任务指派,事件处理失败时重新指派任务。
supervisor守护进程等待nimbus分配任务后生成并监控workers执行任务。supervosior和worker都是运行在不同的JVM进程上。
了解了集群模式下,storm大致的分布概念,下面结合笔者做的一个实例,了解一下如何发布计算资源到storm集群上。
笔者定义了一个spout,两个bolt 运算过程如下:
其中streamMaking是一个不断生成随机数(5~30)的spout实例,Step1Bolt会过滤掉15以下的随机数(过滤),15以上的随机数会乘以16(计算),再将结果向后发射。Step2Bolt订阅Step1Bolt发射的数据,接收数据后,打印输出。流程结束。
笔者在定义spout/bolt实例时,配置了spout,bolt的并行执行数。其中
streamMaking:4 Step1Bolt:2 Step2Bolt 1
这样,发布成功后,storm会根据我的配置,分配足够的计算资源给予spout/bolt进行执行。
发布:
发布时,spout和bolt都是在一起以jar的形式发布到nimbus上的,分配后,内部定义的spout和bolt将以组件的形式被nimbus分配至worker进程中执行。
其中worker都是由supervisor创建的,创建出来的worker进程与supervisor是分开的不同进程。一个supervisor可创建多少worker可通过修改storm安装目录下的storm.yaml进行配置。
task是执行的最小单元。spout/bolt实例在定义中指定了,要起多少task,以及多少executor。也即一个topology发布之前已经定义了task总量,和需要多少资源来执行我的task总量。nimbus将根据已有的计算资源进行分配。
下图中: nimbus左边代表着计算任务量,和所需计算配置
nimbus右边代表着计算资源
nimbus将根据计算资源信息,合理的分发计算任务量。
发布成功后,通过storm自带的UI功能,可以查看你发布的topology运行以及其中每个组件的分布执行情况。
监控图像中清晰的显示了,目前部署的topology,以及topology中每个组件所分配的计算资源所在host,以及每个组件发射了多少tuple,接收了多少tuple,以及有多少个executor在并行执行。
本文讲述了storm内的基本元素以及基本概念,后续将讲述storm的重点配置信息,以及如何提高并发计算能力,窗口概念等高级特性,后续会进行源码分析,以及与其他实时计算中间件的比较。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)