
关键你给的分太少!如下是上线达1年,稳定运行的配置。
---------------------------------------------------------、
属性文件
c3p0driverClass=commysqljdbcDriver
c3p0user=root
c3p0password=edwin
c3p0jdbcUrl=jdbc\:mysql\://1921681123\:3306/edwinuseUnicode\=true&characterEncoding\=utf-8
c3p0minPoolSize = 1
c3p0maxPoolSize = 50
c3p0initialPoolSize = 1
c3p0maxIdleTime = 25000
c3p0acquireIncrement = 1
c3p0acquireRetryAttempts = 30
c3p0acquireRetryDelay = 1000
c3p0testConnectionOnCheckin = true
c3p0automaticTestTable = t_c3p0
c3p0idleConnectionTestPeriod = 18000
c3p0checkoutTimeout=5000
---------------------------------------------------------
spring配置
<xml version="10" encoding="UTF-8">
<beans xmlns=">
不过要注意一些注意事项,对于多个partition和多个consumer
1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5 High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
Properties props = new Properties();
propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据
propsput("zookeeperconnect", "localhost:2181");
propsput("groupid", "pv");
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);
KafkaStream<byte[], byte[]> stream = streamsget(0);
ConsumerIterator<byte[], byte[]> it = streamiterator();
while (ithasNext()){
Systemoutprintln("message: " + new String(itnext()message()));
}
if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block
在用high-level的consumer时,两个给力的工具,
1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumerproperties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
底下给出原文中多线程consumer的完整代码
import kafkaconsumerConsumerConfig;
import kafkaconsumerKafkaStream;
import kafkajavaapiconsumerConsumerConnector;
import javautilHashMap;
import javautilList;
import javautilMap;
import javautilProperties;
import javautilconcurrentExecutorService;
import javautilconcurrentExecutors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
thistopic = a_topic;
}
public void shutdown() {
if (consumer != null) consumershutdown();
if (executor != null) executorshutdown();
}
public void run(int a_numThreads) { // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads
//
executor = ExecutorsnewFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
propsput("zookeeperconnect", a_zookeeper);
propsput("groupid", a_groupId);
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = IntegerparseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
examplerun(threads);
try {
Threadsleep(10000);
} catch (InterruptedException ie) {
}
exampleshutdown();
}
}
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,
什么时候用这个接口
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once
当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
You must keep track of the offsets in your application to know where you left off consuming
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer的步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变
更改tomcat的配置文件
<Connector port="80" max>有两种方法:
1 就需要在环境变量中加上TOMCAT_OPTS, CATALINA_OPTS两个属性,
如 SET CATALINA_OPTS= -Xms256m -Xmx512m;
ms是最小的,mx是最大,64m, 512m分别是指内存的容量
2 修改Catalinabat文件
在166行“rem Execute Java with the applicable properties ”以下每行
%_EXECJAVA% %JAVA_OPTS% %CATALINA_OPTS% %DEBUG_OPTS% -Djavaendorseddirs="%JAVA_ENDORSED_DIRS%" -classpath "%CLASSPATH%" -Dcatalinabase="%CATALINA_BASE%" -Dcatalinahome="%CATALINA_HOME%" -Djavaiotmpdir="%CATALINA_TMPDIR%" %MAINCLASS% %CMD_LINE_ARGS% %ACTION% 中的%CATALINA_OPTS% 替换成-Xms256m -Xmx512m
注意加大的时候,要注意TOMCAT最大内存,是物理内存的80%为上限
以上调整表示初始化内存为256MB,可以使用的最大内存为512MB
tomcat中的几点配置说明
如何加大tomcat连接数
在tomcat配置文件serverxml中的<Connector />配置中,和连接数相关的参数有:
minProcessors:最小空闲连接线程数,用于提高系统处理性能,默认值为10
maxProcessors:最大连接线程数,即:并发处理的最大请求数,默认值为75
acceptCount:允许的最大连接数,应大于等于maxProcessors,默认值为100
enableLookups:是否反查域名,取值为:true或false。为了提高处理能力,应设置为false
connectionTimeout:网络连接超时,单位:毫秒。设置为0表示永不超时,这样设置有隐患的。通常可设置为30000毫秒。
其中和最大连接数相关的参数为maxProcessors和acceptCount。如果要加大并发连接数,应同时加大这两个参数。
web server允许的最大连接数还受制于 *** 作系统的内核参数设置,通常Windows是2000个左右,Linux是1000个左右。Unix中如何设置这些参数,请参阅Unix常用监控和管理命令
tomcat4中的配置示例:
<Connector className="orgapachecoyotetomcat4CoyoteConnector"
port="8080" minProcessors="10" maxProcessors="1024"
enableLookups="false" redirectPort="8443"
acceptCount="1024" debug="0" connectionTimeout="30000" />
对于其他端口的侦听配置,以此类推。
可以 生产者和消费者可以使用不同的传输协议来传输消息,ActiveMQ提供了广泛的连接模式,包括>
web server允许的最大线程连接数还受制于 *** 作系统的内核参数设置,通常Windows是2000个左右,Linux是1000个左右。
1编辑tomcat安装目录下的conf目录下的serverxml文件
在tomcat配置文件serverxml中的<Connector />配置中,和连接数相关的参数有:
maxThreads="150" 表示最多同时处理150个连接,Tomcat使用线程来处理接收的每个请求。这个值表示Tomcat可创建的最大的线程数。默认值200。
minSpareThreads="25" 表示即使没有人使用也开这么多空线程等待
maxSpareThreads="75" 表示如果最多可以空75个线程,例如某时刻有80人访问,之后没有人访问了,则tomcat不会保留80个空线程,而是关闭5个空的。 (一旦创建的线程超过这个值,Tomcat就会关闭不再需要的socket线程。默认值50。
)
acceptCount="100" 当同时连接的人数达到maxThreads时,还可以接收排队的连接数量,超过这个连接的则直接返回拒绝连接。(指定当任何能够使用的处理请求的线程数都 被使用时,能够放到处理队列中的请求数,超过这个数的请求将不予处理。默认值10。 )
其中和最大连接数相关的参数为maxThreads和acceptCount。如果要加大并发连接数,应同时加大这两个参数。
web server允许的最大连接数还受制于 *** 作系统的内核参数设置,通常Windows是2000个左右,Linux是1000个左右。tomcat5中的配置示例:
<Connector port="8080"
maxThreads="150" minSpareThreads="25" maxSpareThreads="75"
acceptCount="100"/>
对于其他端口的侦听配置,以此类推。
线程池一般有三个重要参数:
1 最大线程数。在程序运行的任何时候,线程数总数都不会超过这个数。如果请求数量超过最大数时,则会等待其他线程结束后再处理。
2 最大共享线程数,即最大空闲线程数。如果当前的空闲线程数超过该值,则多余的线程会被杀掉。
3 最小共享线程数,即最小空闲线程数。如果当前的空闲数小于该值,则一次性创建这个数量的空闲线程,所以它本身也是一个创建线程的步长。
线程池有两个概念:
1 Worker线程。工作线程主要是运行执行代码,有两种状态:空闲状态和运行状态。在空闲状态时,类似“休眠”,等待任务;处理运行状态时,表示正在运行任务(Runnable)。
2 辅助线程。主要负责监控线程池的状态:空闲线程是否超过最大空闲线程数或者小于最小空闲线程数等。如果不满足要求,就调整之。
来 看一下线程池究竟是怎么一回事?其实线程池的原理很简单,类似于 *** 作系统中的缓冲区的概念,它的流程如下:先启动若干数量的线程,并让这些线程都处于睡眠 状态,当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又处于睡眠状态。可能你也许会 问:为什么要搞得这么麻烦,如果每当客户端有新的请求时,我就创建一个新的线程不就完了?这也许是个不错的方法,因为它能使得你编写代码相对容易一些,但 你却忽略了一个重要的问题性能!就拿我所在的单位来说,我的单位是一个省级数据大集中的银行网络中心,高峰期每秒的客户端请求并发数超过100,如果 为每个客户端请求创建一个新线程的话,那耗费的CPU时间和内存将是惊人的,如果采用一个拥有200个线程的线程池,那将会节约大量的的系统资源,使得更 多的CPU时间和内存用来处理实际的商业应用,而不是频繁的线程创建与销毁。
介绍了tomcat、jetty和resin三种Java Web容器的线程池后,按照惯例应该比较它们的优缺点。不过先总结线程池的特点。
线程池作为提高程序处理数据能力的一种方案,应用非常广泛。大量的服务器都或多或少的使用到了线程池技术,不管是用Java还是C++实现,线程池都有如下的特点:
线程池一般有三个重要参数:
1 最大线程数。在程序运行的任何时候,线程数总数都不会超过这个数。如果请求数量超过最大数时,则会等待其他线程结束后再处理。
2 最大共享线程数,即最大空闲线程数。如果当前的空闲线程数超过该值,则多余的线程会被杀掉。
3 最小共享线程数,即最小空闲线程数。如果当前的空闲数小于该值,则一次性创建这个数量的空闲线程,所以它本身也是一个创建线程的步长。
线程池有两个概念:
1 Worker线程。工作线程主要是运行执行代码,有两种状态:空闲状态和运行状态。在空闲状态时,类似“休眠”,等待任务;处理运行状态时,表示正在运行任务(Runnable)。
2 辅助线程。主要负责监控线程池的状态:空闲线程是否超过最大空闲线程数或者小于最小空闲线程数等。如果不满足要求,就调整之。
1、修改启动时内存参数、并指定JVM时区 (在windows server 2008 下时间少了8个小时):
在Tomcat上运行j2ee项目代码时,经常会出现内存溢出的情况,解决办法是在系统参数中增加系统参数:
window下, 在catalinabat最前面:
set JAVA_OPTS=-XX:PermSize=64M -XX:MaxPermSize=128m -Xms512m -Xmx1024m;-Dusertimezone=GMT+08;
一定加在catalinabat最前面。
linux下,在catalinash最前面增加:
JAVA_OPTS="-XX:PermSize=64M -XX:MaxPermSize=128m -Xms512m -Xmx1024m -Dusertimezone=Asia/Shanghai"
注意:前后二者区别,有无set,有无双引号。
2、线程池配置(Tomcat6下)
使用线程池,用较少的线程处理较多的访问,可以提高tomcat处理请求的能力。使用方式:
首先。打开/conf/serverxml,增加
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
maxThreads="500" minSpareThreads="20" maxIdleTime="60000" />
最大线程500(一般服务器足以),最小空闲线程数20,线程最大空闲时间60秒。
然后,修改<Connector >节点,增加executor属性,如:
<Connector executor="tomcatThreadPool"
port="80"
protocol=">
这个不是用来限制登录数,而是限制并发socket线程数(大致可以理解为同时出来多少个 >
以上就是关于用ssh框架做了个小项目,用c3p0做连接池,用的是Mysql 5.1.x ,现问题如下:全部的内容,包括:用ssh框架做了个小项目,用c3p0做连接池,用的是Mysql 5.1.x ,现问题如下:、如何修改kettle slaveserver的最大连接数、kafka consumer重新连接后如何获取当前最新数据等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)