![rabbmitmq连接池[已过生产],第1张 rabbmitmq连接池[已过生产],第1张](/aiimages/rabbmitmq%E8%BF%9E%E6%8E%A5%E6%B1%A0%5B%E5%B7%B2%E8%BF%87%E7%94%9F%E4%BA%A7%5D.png)
源码地址https://gitee.com/tym_hmm/rabbitmq-pool-go
开发语言 golang 依赖库
go get -u gitee.com/tym_hmm/rabbitmq-pool-go
go get -u gitee.com/tym_hmm/rabbitmq-pool-go
功能说明 自定义连接池大小及最大处理channel数消费者底层断线自动重连底层使用轮循方式复用tcp生产者每个tcp对应一个channel,防止channel写入阻塞造成内存使用过量支持rabbitmq exchangeType默认值已在线上生产环镜运行, 5200W请求 qbs 3000 时, 连接池显示无压力
rabbitmq部署为线上集群
| 名称 | 说明 |
|---|---|
| tcp最大连接数 | 5 |
| 生产者消费发送失败最大重试次数 | 5 |
| 消费者最大channel信道数(每个连接自动平分) | 100(每个tcp10个) |
var oncePool sync.Once
var instanceRPool *kelleyRabbimqPool.RabbitPool
func initrabbitmq() *kelleyRabbimqPool.RabbitPool {
oncePool.Do(func() {
//初始化生产者
instanceRPool = kelleyRabbimqPool.NewProductPool()
//初始化消费者
instanceConsumePool = kelleyRabbimqPool.NewConsumePool()
err := instanceRPool.Connect("192.168.1.202", 5672, "guest", "guest")
if err != nil {
fmt.Println(err)
}
})
return instanceRPool
}
生产者
var wg sync.WaitGroup
for i:=0;i<100000; i++ {
wg.Add(1)
go func(num int) {
defer wg.Done()
data:=kelleyRabbimqPool.GetRabbitMqDataFormat(
"testChange5",
kelleyRabbimqPool.EXCHANGE_TYPE_TOPIC,
"textQueue5",
"/",
fmt.Sprintf("这里是数据%d", num)
)
_=instanceRPool.Push(data)
}(i)
}
wg.Wait()
消费者
可定义多个消息者事件, 不通交换机, 队列, 路由
每个事件独立
nomrl := &rabbitmq.ConsumeReceive{
#定义消费者事件
ExchangeName: "testChange31",//队列名称
ExchangeType: kelleyRabbimqPool.EXCHANGE_TYPE_DIRECT,
Route: "",
QueueName: "testQueue31",
IsTry:true,//是否重试
MaxReTry: 5,//最大重试次数
EventFail: func(code int, e error, data []byte) {
fmt.Printf("error:%s", e)
},
/***
* 参数说明
* @param data []byte 接收的rabbitmq数据
* @param header map[string]interface{} 原rabbitmq header
* @param retryClient RabbitmqPool.RetryClientInterface 自定义重试数据接口,重试需return true 防止数据重复提交
***/
EventSuccess: func(data []byte, header map[string]interface{},retryClient kelleyRabbimqPool.RetryClientInterface)bool {//如果返回true 则无需重试
fmt.Printf("data:%s\n", string(data))
return true
},
}
instanceConsumePool.RegisterConsumeReceive(nomrl)
err := instanceConsumePool.RunConsume()
if err != nil {
fmt.Println(err)
}
错误码说明
错误码为
生产者push时返回的 *RabbitMqError消费者事件监听回返的 code
| 错误码 | 说明 |
|---|---|
| 501 | 生产者发送超过最大重试次数 |
| 502 | 获取信道失败, 一般为认道队列数用尽 |
| 503 | 交换机/队列/绑定失败 |
| 504 | 连接失败 |
| 506 | 信道创建失败 |
| 507 | 超过最大重试次数 |
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)