
Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。
练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。
Server.go
Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的ClIEnt.
这个例子与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的ClIEnt列表。而ClIEnt中则保
存了其所有订阅的Channel信息。
package pubsubimport ( "errors" "sync")type ClIEnt struct { ID int Ip string}type Server struct { Dict map[string]*Channel //map[Channel.name]*Channel sync.RWMutex}func NewServer() *Server { s := &Server{} s.Dict = make(map[string]*Channel) //所有channel return s}//订阅func (srv *Server) Subscribe(clIEnt *ClIEnt,channelname string) { // 客户是否在Channel的客户列表中 srv.RLock() ch,found := srv.Dict[channelname] srv.RUnlock() if !found { ch = NewChannel(channelname) ch.AddClIEnt(clIEnt) srv.Lock() srv.Dict[channelname] = ch srv.Unlock() } else { ch.AddClIEnt(clIEnt) }}//取消订阅func (srv *Server) Unsubscribe(clIEnt *ClIEnt,channelname string) { srv.RLock() ch,found := srv.Dict[channelname] srv.RUnlock() if found { if ch.DeleteClIEnt(clIEnt) == 0 { ch.Exit() srv.Lock() delete(srv.Dict,channelname) srv.Unlock() } }}//发布消息func (srv *Server) PublishMessage(channelname,message string) (bool,error) { srv.RLock() ch,found := srv.Dict[channelname] if !found { srv.RUnlock() return false,errors.New("channelname不存在!") } srv.RUnlock() ch.Notify(message) ch.Wait() return true,nil} Channel.go
每个Channel 负责将信息放入WaitGroup,发送到ClIEnt或队列,例子中是打印一条信息。 当clIEnts为空时,则exit().
import ( "fmt" "sync" "sync/atomic")type Channel struct { name string clIEnts map[int]*ClIEnt // exitChan chan int sync.RWMutex waitGroup WaitGroupWrapper messageCount uint64 exitFlag int32}func NewChannel(channelname string) *Channel { return &Channel{ name: channelname,// exitChan: make(chan int),clIEnts: make(map[int]*ClIEnt),}}func (ch *Channel) AddClIEnt(clIEnt *ClIEnt) bool { ch.RLock() _,found := ch.clIEnts[clIEnt.ID] ch.RUnlock() ch.Lock() if !found { ch.clIEnts[clIEnt.ID] = clIEnt } ch.Unlock() return found}func (ch *Channel) DeleteClIEnt(clIEnt *ClIEnt) int { var ret int ch.ReplyMsg( fmt.Sprintf("从channel:%s 中删除clIEnt:%d ",ch.name,clIEnt.ID)) ch.Lock() delete(ch.clIEnts,clIEnt.ID) ch.Unlock() ch.RLock() ret = len(ch.clIEnts) ch.RUnlock() return ret}func (ch *Channel) Notify(message string) bool { ch.RLock() defer ch.RUnlock() for cID,_ := range ch.clIEnts { ch.ReplyMsg( fmt.Sprintf("channel:%s clIEnt:%d message:%s",cID,message)) } return true}func (ch *Channel) ReplyMsg(message string) { ch.waitGroup.Wrap(func() { fmt.Println(message) })}func (ch *Channel) Wait() { ch.waitGroup.Wait()}func (ch *Channel) Exiting() bool { return atomic.LoadInt32(&ch.exitFlag) == 1}func (ch *Channel) Exit() { if !atomic.CompareAndSwAPInt32(&ch.exitFlag,1) { return } //close(ch.exitChan) ch.Wait()}func (ch *Channel) PutMessage(clIEntID int,message string) { ch.RLock() defer ch.RUnlock() if ch.Exiting() { return } //select { // case <-t.exitChan: // return //} fmt.Println(ch.name,":",message) atomic.AddUint64(&ch.messageCount,1) return}
主程序:
//订阅/发布 练习//author: Xiong Chuan liang //date: 2015-3-17package mainimport ( . "pubsub")func main(){ c1 := &ClIEnt{ID:100,Ip:"172.18.1.1"} c3:= &ClIEnt{ID:300,Ip:"172.18.1.3"} srv := NewServer() srv.Subscribe(c1,"topic") srv.Subscribe(c3,"topic") srv.PublishMessage("topic","测试信息1") srv.Unsubscribe(c3,"topic") srv.PublishMessage("topic","测试信息2222") srv.Subscribe(c1,"topic2") srv.Subscribe(c3,"topic2") srv.PublishMessage("topic2"," topic2的测试信息") }/*运行结果:channel:topic clIEnt:100 message:测试信息1channel:topic clIEnt:300 message:测试信息1从channel:topic 中删除clIEnt:300channel:topic clIEnt:100 message:测试信息2222channel:topic2 clIEnt:100 message: topic2的测试信息channel:topic2 clIEnt:300 message: topic2的测试信息*/ 没做太复杂的测试,粗略看好像没有问题。
MAIL: xcl_168@aliyun.com
BLOG: http://blog.csdn.net/xcl168
总结以上是内存溢出为你收集整理的简单的订阅发布机制实现(Golang)全部内容,希望文章能够帮你解决简单的订阅发布机制实现(Golang)所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)