如何在eclipse调试storm程序

如何在eclipse调试storm程序,第1张

一、介绍

storm提供了两种运行模式:本地模式和分布式模式。本地模式针对开发调试storm topologies非常有用。

Storm has two modes of operation: local mode and distributed mode.

In local mode, Storm executes completely in process by simulating

worker nodes with threads. Local mode is useful for testing and

development of topologies

因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。如果你正在为此问题而烦恼,请使用本文提供的方法。

二、实施步骤

如何基于eclipse+maven调试storm程序,步骤如下:

1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1)

2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar)

Github上的pom.xml,引入的依赖太多,有些不需要,

3. 编写storm程序,指定为本地模式运行。本文提供的程序是wordcount

重要的是LocalCluster cluster = new LocalCluster()这一句

Config conf = new Config()

conf.setDebug(true)

conf.setNumWorkers(2)

LocalCluster cluster = new LocalCluster()

cluster.submitTopology("test", conf, builder.createTopology())

Utils.sleep(10000)

cluster.killTopology("test")

cluster.shutdown()

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>storm.starter</groupId>

<artifactId>storm-starter</artifactId>

<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

</properties>

<repositories>

<repository>

<id>github-releases</id>

<url>http://oss.sonatype.org/content/repositories/github-releases/</url>

</repository>

<repository>

<id>clojars.org</id>

<url>http://clojars.org/repo</url>

</repository>

</repositories>

<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.11</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>storm</groupId>

<artifactId>storm</artifactId>

<version>0.9.0.1</version>

<!-- keep storm out of the jar-with-dependencies -->

<scope>provided</scope>

</dependency>

<dependency>

<groupId>commons-collections</groupId>

<artifactId>commons-collections</artifactId>

<version>3.2.1</version>

</dependency>

</dependencies>

</project>

storm程序

package storm.starter

import java.util.HashMap

import java.util.Map

import storm.starter.spout.RandomSentenceSpout

import backtype.storm.Config

import backtype.storm.LocalCluster

import backtype.storm.StormSubmitter

import backtype.storm.topology.BasicOutputCollector

import backtype.storm.topology.OutputFieldsDeclarer

import backtype.storm.topology.TopologyBuilder

import backtype.storm.topology.base.BaseBasicBolt

import backtype.storm.tuple.Fields

import backtype.storm.tuple.Tuple

import backtype.storm.tuple.Values

/**

* This topology demonstrates Storm's stream groupings and multilang

* capabilities.

*/

public class WordCountTopology {

public static class SplitSentence extends BaseBasicBolt {

@Override

public void execute(Tuple input, BasicOutputCollector collector) {

try {

String msg = input.getString(0)

System.out.println(msg + "-------------------")

if (msg != null) {

String[] s = msg.split(" ")

for (String string : s) {

collector.emit(new Values(string))

}

}

} catch (Exception e) {

e.printStackTrace()

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"))

}

}

public static class WordCount extends BaseBasicBolt {

Map<String, Integer>counts = new HashMap<String, Integer>()

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0)

Integer count = counts.get(word)

if (count == null)

count = 0

count++

counts.put(word, count)

collector.emit(new Values(word, count))

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "count"))

}

}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder()

builder.setSpout("spout", new RandomSentenceSpout(), 5)

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping(

"spout")

builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",

new Fields("word"))

Config conf = new Config()

conf.setDebug(true)

if (args != null &&args.length >0) {

conf.setNumWorkers(3)

StormSubmitter.submitTopology(args[0], conf,

builder.createTopology())

} else {

conf.setMaxTaskParallelism(3)

LocalCluster cluster = new LocalCluster()

cluster.submitTopology("word-count", conf, builder.createTopology())

Thread.sleep(10000)

cluster.shutdown()

}

}

}

package storm.starter.spout

import backtype.storm.spout.SpoutOutputCollector

import backtype.storm.task.TopologyContext

import backtype.storm.topology.OutputFieldsDeclarer

import backtype.storm.topology.base.BaseRichSpout

import backtype.storm.tuple.Fields

import backtype.storm.tuple.Values

import backtype.storm.utils.Utils

import java.util.Map

import java.util.Random

public class RandomSentenceSpout extends BaseRichSpout {

SpoutOutputCollector _collector

Random _rand

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_collector = collector

_rand = new Random()

}

@Override

public void nextTuple() {

Utils.sleep(100)

String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",

"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }

String sentence = sentences[_rand.nextInt(sentences.length)]

_collector.emit(new Values(sentence))

}

@Override

public void ack(Object id) {

}

@Override

public void fail(Object id) {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"))

}

}

安装Storm软件分五步: 安装Zookeeper。 安装Storm的依赖环境:Java和Python。 下载并解压Storm安装包。 修改必要的Storm配置文件。 启动Storm程序。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/11518264.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-16
下一篇2023-05-16

发表评论

登录后才能评论

评论列表(0条)

    保存