简单的订阅发布机制实现(Golang)

简单的订阅发布机制实现(Golang),第1张

概述       Redis和NSQ都有完善的订阅发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。 练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。 Server.go          Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client.  这个例子

Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。

练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。


Server.go

Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的ClIEnt.

这个例子与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的ClIEnt列表。而ClIEnt中则保

存了其所有订阅的Channel信息。

package pubsubimport (	"errors"	"sync")type ClIEnt struct {	ID int	Ip string}type Server struct {	Dict map[string]*Channel //map[Channel.name]*Channel	sync.RWMutex}func NewServer() *Server {	s := &Server{}	s.Dict = make(map[string]*Channel) //所有channel	return s}//订阅func (srv *Server) Subscribe(clIEnt *ClIEnt,channelname string) {	// 客户是否在Channel的客户列表中	srv.RLock()	ch,found := srv.Dict[channelname]	srv.RUnlock()	if !found {		ch = NewChannel(channelname)		ch.AddClIEnt(clIEnt)		srv.Lock()		srv.Dict[channelname] = ch		srv.Unlock()	} else {		ch.AddClIEnt(clIEnt)	}}//取消订阅func (srv *Server) Unsubscribe(clIEnt *ClIEnt,channelname string) {	srv.RLock()	ch,found := srv.Dict[channelname]	srv.RUnlock()	if found {		if ch.DeleteClIEnt(clIEnt) == 0 {			ch.Exit()			srv.Lock()			delete(srv.Dict,channelname)			srv.Unlock()		}	}}//发布消息func (srv *Server) PublishMessage(channelname,message string) (bool,error) {	srv.RLock()	ch,found := srv.Dict[channelname]	if !found {		srv.RUnlock()		return false,errors.New("channelname不存在!")	}	srv.RUnlock()	ch.Notify(message)	ch.Wait()	return true,nil}

Channel.go

每个Channel 负责将信息放入WaitGroup,发送到ClIEnt或队列,例子中是打印一条信息。 当clIEnts为空时,则exit().

import (	"fmt"	"sync"	"sync/atomic")type Channel struct {	name    string	clIEnts map[int]*ClIEnt	//  exitChan   chan int	sync.RWMutex	waitGroup    WaitGroupWrapper	messageCount uint64	exitFlag     int32}func NewChannel(channelname string) *Channel {	return &Channel{		name: channelname,//  exitChan:       make(chan int),clIEnts: make(map[int]*ClIEnt),}}func (ch *Channel) AddClIEnt(clIEnt *ClIEnt) bool {	ch.RLock()	_,found := ch.clIEnts[clIEnt.ID]	ch.RUnlock()	ch.Lock()	if !found {		ch.clIEnts[clIEnt.ID] = clIEnt	}	ch.Unlock()	return found}func (ch *Channel) DeleteClIEnt(clIEnt *ClIEnt) int {	var ret int	ch.ReplyMsg(		fmt.Sprintf("从channel:%s 中删除clIEnt:%d ",ch.name,clIEnt.ID))	ch.Lock()	delete(ch.clIEnts,clIEnt.ID)	ch.Unlock()	ch.RLock()	ret = len(ch.clIEnts)	ch.RUnlock()	return ret}func (ch *Channel) Notify(message string) bool {	ch.RLock()	defer ch.RUnlock()	for cID,_ := range ch.clIEnts {		ch.ReplyMsg(			fmt.Sprintf("channel:%s clIEnt:%d message:%s",cID,message))	}	return true}func (ch *Channel) ReplyMsg(message string) {	ch.waitGroup.Wrap(func() { fmt.Println(message) })}func (ch *Channel) Wait() {	ch.waitGroup.Wait()}func (ch *Channel) Exiting() bool {	return atomic.LoadInt32(&ch.exitFlag) == 1}func (ch *Channel) Exit() {	if !atomic.CompareAndSwAPInt32(&ch.exitFlag,1) {		return	}	//close(ch.exitChan)	ch.Wait()}func (ch *Channel) PutMessage(clIEntID int,message string) {	ch.RLock()	defer ch.RUnlock()	if ch.Exiting() {		return	}	//select {	// case <-t.exitChan:	// return	//}	fmt.Println(ch.name,":",message)	atomic.AddUint64(&ch.messageCount,1)	return}


主程序:

//订阅/发布 练习//author: Xiong Chuan liang  //date: 2015-3-17package mainimport (  . "pubsub")func main(){  c1 := &ClIEnt{ID:100,Ip:"172.18.1.1"}  c3:=  &ClIEnt{ID:300,Ip:"172.18.1.3"}   srv := NewServer()   srv.Subscribe(c1,"topic")   srv.Subscribe(c3,"topic")   srv.PublishMessage("topic","测试信息1")   srv.Unsubscribe(c3,"topic")   srv.PublishMessage("topic","测试信息2222")    srv.Subscribe(c1,"topic2")    srv.Subscribe(c3,"topic2")    srv.PublishMessage("topic2"," topic2的测试信息")   }/*运行结果:channel:topic clIEnt:100 message:测试信息1channel:topic clIEnt:300 message:测试信息1从channel:topic 中删除clIEnt:300channel:topic clIEnt:100 message:测试信息2222channel:topic2 clIEnt:100 message: topic2的测试信息channel:topic2 clIEnt:300 message: topic2的测试信息*/

没做太复杂的测试,粗略看好像没有问题。


MAIL: xcl_168@aliyun.com

BLOG: http://blog.csdn.net/xcl168

总结

以上是内存溢出为你收集整理的简单的订阅发布机制实现(Golang)全部内容,希望文章能够帮你解决简单的订阅发布机制实现(Golang)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1288438.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-09
下一篇2022-06-09

发表评论

登录后才能评论

评论列表(0条)

    保存