c# – Rx groupby直到条件改变

c# – Rx groupby直到条件改变,第1张

概述我坚持使用rx和特定的查询. 问题: Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time, but i 我坚持使用rx和特定的查询.
问题:

Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time,but it is really important to preserve the order. Additionally,operations should be buffered and done in sequences every X seconds

例:

在:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete

日期:

insert(3)-delete(2)-insert(1)-delete(4)

我写了一个简单的应用程序来测试它,它或多或少地工作,但它不尊重传入插入/删除的顺序

namespace RxTests{using System;using System.Collections.Generic;using System.Globalization;using System.linq;using System.Reactive.Concurrency;using System.Reactive.linq;using System.Reactive.Subjects;using System.Text;using System.Threading;internal class Program{    private static Readonly Random Random = new Random();    private static Readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();    private static Readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();    private static voID Main(string[] args)    {        Console.Writeline("Starting production");        var producerScheduler = new EventLoopScheduler();        var consumerScheduler = new EventLoopScheduler();        var producer =            Observable.Interval(TimeSpan.FromSeconds(2))                      .SubscribeOn(producerScheduler)                      .Subscribe(Produce,WriteProductionCompleted);        var consumer =            operations.ObserveOn(producerScheduler)                      .GroupBy(operation => operation.Delete)                      .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8),50))                      .SubscribeOn(consumerScheduler)                      .Subscribe(WriteUpdateOperations);        Console.Writeline("Type any key to stop");        Console.ReadKey();        consumer.dispose();        producer.dispose();    }    private static voID Produce(long time)    {        var delete = Random.NextDouble() < 0.5;        Console.Writeline("Produce {0},{1} at {2}",time + 1,delete,time);        var IDString = (time + 1).ToString(CultureInfo.InvariantCulture);        var ID = time + 1;        operations.OnNext(            new UpdateOperation(ID,IDString,time.ToString(CultureInfo.InvariantCulture)));    }    private static voID WriteProductionCompleted()    {        Console.Writeline("Production completed");        ProducerStopped.Cancel();    }    private static voID WriteUpdateOperation(UpdateOperation updateOperation)    {        Console.Writeline("Consuming {0}",updateOperation);    }    private static voID WriteUpdateOperations(IList<UpdateOperation> updateOperation)    {        foreach (var operation in updateOperation)        {            WriteUpdateOperation(operation);        }    }    private class UpdateOperation    {        public UpdateOperation(long ID,bool delete,params string[] changes)        {            this.ID = ID;            this.Delete = delete;            this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());        }        public bool Delete { get; set; }        public long ID { get; private set; }        public IList<string> Changes { get; private set; }        public overrIDe string ToString()        {            var stringBuilder = new StringBuilder("{UpdateOperation ");            stringBuilder.AppendFormat("ID: {0},Delete: {1},Changes: [",this.ID,this.Delete);            if (this.Changes.Count > 0)            {                stringBuilder.Append(this.Changes.First());                foreach (var change in this.Changes.Skip(1))                {                    stringBuilder.AppendFormat(",{0}",change);                }            }            stringBuilder.Append("]}");            return stringBuilder.ToString();        }    }}

}

任何人都可以帮助我正确的查询?

谢谢

更新08.03.13(JerKimball的建议)

以下几行是对JerKimball代码的小改动/补充,用于打印结果:

using(query.Subscribe(Print)){    Console.Readline();    producer.dispose();        }

使用以下打印方法:

private static voID Print(IObservable<IList<Operation>> operations){    operations.Subscribe(Print);}private static voID Print(IList<Operation> operations){    var stringBuilder = new StringBuilder("[");    if (operations.Count > 0)    {        stringBuilder.Append(operations.First());        foreach (var item in operations.Skip(1))        {            stringBuilder.AppendFormat(",item);        }    }    stringBuilder.Append("]");    Console.Writeline(stringBuilder); }

以及要 *** 作的字符串:

public overrIDe string ToString(){    return string.Format("{0}:{1}",this.Type,this.Seq);}

订单保留,但是:

>我不确定在另一个订阅中订阅:它是否正确(这是我很久以前的一个问题,我从来都不清楚)?
>我在每个列表上总是不超过两个元素(即使流生成两个以上具有相同类型的连续值)

解决方法 让我们尝试一种新方法(因此新答案):

首先,让我们定义一个扩展方法,该方法将根据键“折叠”项目列表,同时保留顺序:

public static class Ext{    public static IEnumerable<List<T>> ToRuns<T,TKey>(            this IEnumerable<T> source,Func<T,TKey> keySelector)     {        using (var enumerator = source.GetEnumerator())         {            if (!enumerator.MoveNext())                yIEld break;            var currentSet = new List<T>();            // inspect the first item            var lastKey = keySelector(enumerator.Current);            currentSet.Add(enumerator.Current);            while (enumerator.MoveNext())             {                var newKey = keySelector(enumerator.Current);                if (!Equals(newKey,lastKey))                 {                    // A difference == new run; return what we've got thus far                    yIEld return currentSet;                    lastKey = newKey;                    currentSet = new List<T>();                }                currentSet.Add(enumerator.Current);            }            // Return the last run.            yIEld return currentSet;            // and clean up            currentSet = new List<T>();            lastKey = default(TKey);        }    }}

相当简单 – 给定IEnumerable< T>,将返回List< List< T>>每个子列表将具有相同的密钥.

现在,喂它并使用它:

var rnd = new Random();var fakeSource = new Subject<Operation>();var producer = Observable    .Interval(TimeSpan.FromMilliseconds(1000))    .Subscribe(i =>         {            var op = new Operation();            op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";            fakeSource.OnNext(op);        });    var singleSource = fakeSource    .Publish().RefCount();var query = singleSource    // change this value to alter your "look at" time window    .Buffer(TimeSpan.FromSeconds(5))        .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0));using(query.Subscribe(batch => {    foreach(var item in batch)    {        Console.Writeline("{0}({1})",item.First().Type,item.Count);    }})){    Console.Readline();    producer.dispose();     }

给它一个旋转 – 这是我在典型的运行中看到的:

insert(4)delete(2)insert(1)delete(1)insert(1)insert(1)delete(1)insert(1)delete(2)delete(2)insert(2)delete(1)insert(1)delete(2)insert(2)
总结

以上是内存溢出为你收集整理的c# – Rx groupby直到条件改变全部内容,希望文章能够帮你解决c# – Rx groupby直到条件改变所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1245282.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-07
下一篇2022-06-07

发表评论

登录后才能评论

评论列表(0条)

    保存