使用带有inproc传输的C#的ZeroMQ

使用带有inproc传输的C#的ZeroMQ,第1张

概述我正在尝试ZeroMQ,并试图找到一些工作.我的第一个想法是使用inproc传输设置一个REP / REQ,看看我是否可以在两个线程之间发送消息.以下代码大部分来自clzmq示例,但似乎不起作用. 服务器和客户端都绑定到传输,但是当客户端尝试执行发送它并且只是坐在那里.我没有ZeroMQ经验,所以我不知道在哪里先看,任何帮助将不胜感激.这是违规(冒犯性)代码: using System;usin 我正在尝试ZeroMQ,并试图找到一些工作.我的第一个想法是使用inproc传输设置一个REP / REQ,看看我是否可以在两个线程之间发送消息.以下代码大部分来自clzmq示例,但似乎不起作用.

服务器和客户端都绑定到传输,但是当客户端尝试执行发送它并且只是坐在那里.我没有ZeroMQ经验,所以我不知道在哪里先看,任何帮助将不胜感激.这是违规(冒犯性)代码:

using System;using System.Diagnostics;using System.Threading;using NUnit.Framework;using ZMQ;namespace PostBox{    [TestFixture]    public class Class1    {        private const string Address = "inproc://test";        private const uint MessageSize = 10;        private const int roundtripCount = 100;        [Test]        public voID Should()        {            var clIEntThread = new Thread(StartClIEnt);            clIEntThread.Start();            var serverThread = new Thread(StartServer);            serverThread.Start();            clIEntThread.Join();            serverThread.Join();            Console.Writeline("Done with life");        }        private voID StartServer()        {            //  Initialise 0MQ infrastructure            using (var ctx = new Context(1))            {                using (var skt = ctx.socket(SocketType.REP))                {                    skt.Bind(Address);                    Console.Writeline("Server has bound");                    //  Bounce the messages.                    for (var i = 0; i < roundtripCount; i++)                    {                        var msg = skt.Recv();                        DeBUG.Assert(msg.Length == MessageSize);                        skt.Send(msg);                    }                    Thread.Sleep(1000);                }            }            Console.Writeline("Done with server");        }        private voID StartClIEnt()        {            Thread.Sleep(2000);            //  Initialise 0MQ infrastructure            using (var ctx = new Context(1))            {                using (var skt = ctx.socket(SocketType.REQ))                {                    skt.Bind(Address);                    Console.Writeline("ClIEnt has bound");                    //  Create a message to send.                    var msg = new byte[MessageSize];                    //  Start measuring the time.                    var watch = new Stopwatch();                    watch.Start();                    //  Start sending messages.                    for (var i = 0; i < roundtripCount; i++)                    {                        skt.Send(msg);                        msg = skt.Recv();                        DeBUG.Assert(msg.Length == MessageSize);                        Console.Write(".");                    }                    //  Stop measuring the time.                    watch.Stop();                    var elapsedtime = watch.ElapsedTicks;                    //  Print out the test parameters.                    Console.Writeline("message size: " + MessageSize + " [B]");                    Console.Writeline("roundtrip count: " + roundtripCount);                    //  Compute and print out the latency.                    var latency = (double)(elapsedtime) / roundtripCount / 2 *                        1000000 / Stopwatch.Frequency;                    Console.Writeline("Your average latency is {0} [us]",latency.ToString("f2"));                }            }            Console.Writeline("Done with clIEnt");        }    }}

编辑:

我在下面的答案的帮助下得到了这个工作,但是它也需要我改变一个绑定到一个连接,这是有道理的,当你考虑到,因为我们有一个服务器绑定到本地传输和客户端连接到远程运输.以下是更新的代码:

using System;using System.Diagnostics;using System.Threading;using NUnit.Framework;using ZMQ;namespace PostBox{    [TestFixture]    public class Class1    {        private const string Address = "inproc://test";        private const uint MessageSize = 10;        private const int roundtripCount = 100;        private static Context ctx;        [Test]        public voID Should()        {            using (ctx = new Context(1))            {                var clIEntThread = new Thread(StartClIEnt);                clIEntThread.Start();                var serverThread = new Thread(StartServer);                serverThread.Start();                clIEntThread.Join();                serverThread.Join();                Console.Writeline("Done with life");            }        }        private voID StartServer()        {            try            {                using (var skt = ctx.socket(SocketType.REP))                {                    skt.Bind(Address);                    Console.Writeline("Server has bound");                    //  Bounce the messages.                    for (var i = 0; i < roundtripCount; i++)                    {                        var msg = skt.Recv();                        DeBUG.Assert(msg.Length == MessageSize);                        skt.Send(msg);                    }                    Thread.Sleep(1000);                }                Console.Writeline("Done with server");            }            catch (System.Exception e)            {                Console.Writeline(e.Message);            }        }        private voID StartClIEnt()        {            Thread.Sleep(2000);            try            {                //  Initialise 0MQ infrastructure                using (var skt = ctx.socket(SocketType.REQ))                {                    skt.Connect(Address);                    Console.Writeline("ClIEnt has bound");                    //  Create a message to send.                    var msg = new byte[MessageSize];                    //  Start measuring the time.                    var watch = new Stopwatch();                    watch.Start();                    //  Start sending messages.                    for (var i = 0; i < roundtripCount; i++)                    {                        skt.Send(msg);                        msg = skt.Recv();                        DeBUG.Assert(msg.Length == MessageSize);                        Console.Write(".");                    }                    //  Stop measuring the time.                    watch.Stop();                    var elapsedtime = watch.ElapsedTicks;                    //  Print out the test parameters.                    Console.Writeline("message size: " + MessageSize + " [B]");                    Console.Writeline("roundtrip count: " + roundtripCount);                    //  Compute and print out the latency.                    var latency = (double)(elapsedtime) / roundtripCount / 2 *                                  1000000 / Stopwatch.Frequency;                    Console.Writeline("Your average latency is {0} [us]",latency.ToString("f2"));                }                Console.Writeline("Done with clIEnt");            }            catch (System.Exception e)            {                Console.Writeline(e.Message);            }        }    }}
解决方法 我相信,两个线程都需要使用相同的上下文. Zeromq指南建议不要在一个进程中使用多个上下文.
创建上下文,在两个线程之间共享上下文.这应该工作.

从http://zguide.zeromq.org/chapter:all

You MUST create a ‘context’ object for your process,and pass that to all threads. The context collects ØMQ’s state. To create a connection across the inproc: transport,both server and clIEnt thread must share the same context object.

总结

以上是内存溢出为你收集整理的使用带有inproc传输的C#的ZeroMQ全部内容,希望文章能够帮你解决使用带有inproc传输的C#的ZeroMQ所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1237462.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-06
下一篇2022-06-06

发表评论

登录后才能评论

评论列表(0条)

    保存