c# – 在调用OnMessage()后自动处理BrokeredMessage

c# – 在调用OnMessage()后自动处理BrokeredMessage,第1张

概述我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们.我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题: >我一次最多只能获得256条消息,甚至可以根据消息大小随机获取. >即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息.由于消息将继续进入,我不能继续发出请求, 我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们.我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题:

>我一次最多只能获得256条消息,甚至可以根据消息大小随机获取.
>即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息.由于消息将继续进入,我不能继续发出请求,直到它为空,因为它永远不会是空的.

我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权.

Basically I am trying to let a set number of messages build up and
then process them at once. I use a timer to force a delay but I need
to be able to queue my items as they come in.

基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag.

var batchingQueue = new ConcurrentBag<brokeredMessage>();myQueueClIEnt.OnMessage((m) =>{    Console.Writeline("@R_827_4403@ message");    batchingQueue.Add(m);});while (true){    var sw = WaitableStopwatch.StartNew();    brokeredMessage msg;    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed    {        ...do this until I have a thousand ready to be written to DB in batch        Console.Writeline("Completing message");        msg.Complete(); // <== ERRORS HERE    }    sw.Wait(MINIMUM_DELAY);}

However as soon as I access the message outsIDe of the OnMessage
pipeline it shows the brokeredMessage as already being disposed.

我认为这必须是OnMessage的一些自动行为,我没有看到任何方式对消息做任何事情,除了立即处理它我不想做.

解决方法 这对BlockingCollection来说非常容易.

var batchingQueue = new BlockingCollection<brokeredMessage>();myQueueClIEnt.OnMessage((m) =>{    Console.Writeline("@R_827_4403@ message");    batchingQueue.Add(m);});

和你的消费者线程:

foreach (var msg in batchingQueue.GetConsumingEnumerable()){    Console.Writeline("Completing message");    msg.Complete();}

GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项目,直到设置了IsCompleted属性并且队列为空.如果队列为空但IsCompleted为False,则会执行非忙等待下一个项目.

要取消使用者线程(即关闭程序),您将停止向队列添加内容并让主线程调用batchingQueue.CompleteAdding.使用者将清空队列,看到IsCompleted属性为True,然后退出.

在这里使用BlockingCollection比ConcurrentBag或ConcurrentQueue更好,因为BlockingCollection接口更容易使用.特别是,使用GetConsumingEnumerable可以使您不必担心检查计数或进行忙等待(轮询循环).它只是有效.

另请注意,ConcurrentBag有一些相当奇怪的删除行为.特别是,删除项目的顺序根据哪个线程删除项目而不同.创建包的线程以与其他线程不同的顺序删除项目.有关详细信息,请参阅Using the ConcurrentBag Collection.

您还没有说明为什么要在输入中批量处理项目.除非有一个压倒一切的性能原因,否则使用该批处理逻辑使代码复杂化似乎并不是一个特别好的主意.

如果你想对数据库进行批量写入,那么我建议使用一个简单的List< T>缓冲项目.如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们.然后,而是直接写入数据库,将项目添加到列表中.当列表获得1,000个项目或经过一定时间后,分配新列表并启动任务以将旧列表写入数据库.像这样:

// at class scope// Flush every 5 minutes.private @R_404_4609@ TimeSpan FlushDelay = TimeSpan.FromMinutes(5);private const int MaxBufferItems = 1000;// Create a timer for the buffer flush.System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush,FlushDelay.TotalMilliseconds,Timeout.Infinite);  // A lock for the List. Unless you're getting hundreds of thousands// of items per second,this will not be a performance problem.object _ListLock = new Object();List<brokeredMessage> _recordBuffer = new List<brokeredMessage>();

然后,在您的消费者中:

foreach (var msg in batchingQueue.GetConsumingEnumerable()){    // process the message    Console.Writeline("Completing message");    msg.Complete();    lock (_ListLock)    {        _recordBuffer.Add(msg);        if (_recordBuffer.Count >= MaxBufferItems)        {            // Stop the timer            _flushTimer.Change(Timeout.Infinite,Timeout.Infinite);            // Save the old List and allocate a new one            var myList = _recordBuffer;            _recordBuffer = new List<brokeredMessage>();            // Start a task to write to the database            Task.Factory.StartNew(() => FlushBuffer(myList));            // Restart the timer            _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);        }    }}private voID TimedFlush(){    bool lockTaken = false;    List<brokeredMessage> myList = null;    try    {        if (Monitor.TryEnter(_ListLock,out lockTaken))        {            // Save the old List and allocate a new one            myList = _recordBuffer;            _recordBuffer = new List<brokeredMessage>();        }    }    finally    {        if (lockTaken)        {            Monitor.Exit(_ListLock);        }    }    if (myList != null)    {        FlushBuffer(myList);    }    // Restart the timer    _flushTimer.Change(FlushDelay.TotalMilliseconds,Timeout.Infinite);}

这里的想法是,您将旧列表排除在外,分配新列表以便继续处理,然后将旧列表的项目写入数据库.锁是为了防止计时器和记录计数器相互踩踏.没有锁定,事情可能会在一段时间内正常工作,然后你会在不可预测的时间发生奇怪的崩溃.

我喜欢这种设计,因为它消除了消费者的轮询.我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器).稍微考虑一下,我可以消除这个要求.但它的编写方式很好.

总结

以上是内存溢出为你收集整理的c# – 在调用OnMessage()后自动处理BrokeredMessage全部内容,希望文章能够帮你解决c# – 在调用OnMessage()后自动处理BrokeredMessage所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1219611.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-05
下一篇2022-06-05

发表评论

登录后才能评论

评论列表(0条)

    保存