
问题:
Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time,but it is really important to preserve the order. Additionally,operations should be buffered and done in sequences every X seconds
例:
在:
insert-insert-insert-delete-delete-insert-delete-delete-delete-delete
日期:
insert(3)-delete(2)-insert(1)-delete(4)
我写了一个简单的应用程序来测试它,它或多或少地工作,但它不尊重传入插入/删除的顺序
namespace RxTests{using System;using System.Collections.Generic;using System.Globalization;using System.linq;using System.Reactive.Concurrency;using System.Reactive.linq;using System.Reactive.Subjects;using System.Text;using System.Threading;internal class Program{ private static Readonly Random Random = new Random(); private static Readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource(); private static Readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>(); private static voID Main(string[] args) { Console.Writeline("Starting production"); var producerScheduler = new EventLoopScheduler(); var consumerScheduler = new EventLoopScheduler(); var producer = Observable.Interval(TimeSpan.FromSeconds(2)) .SubscribeOn(producerScheduler) .Subscribe(Produce,WriteProductionCompleted); var consumer = operations.ObserveOn(producerScheduler) .GroupBy(operation => operation.Delete) .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8),50)) .SubscribeOn(consumerScheduler) .Subscribe(WriteUpdateOperations); Console.Writeline("Type any key to stop"); Console.ReadKey(); consumer.dispose(); producer.dispose(); } private static voID Produce(long time) { var delete = Random.NextDouble() < 0.5; Console.Writeline("Produce {0},{1} at {2}",time + 1,delete,time); var IDString = (time + 1).ToString(CultureInfo.InvariantCulture); var ID = time + 1; operations.OnNext( new UpdateOperation(ID,IDString,time.ToString(CultureInfo.InvariantCulture))); } private static voID WriteProductionCompleted() { Console.Writeline("Production completed"); ProducerStopped.Cancel(); } private static voID WriteUpdateOperation(UpdateOperation updateOperation) { Console.Writeline("Consuming {0}",updateOperation); } private static voID WriteUpdateOperations(IList<UpdateOperation> updateOperation) { foreach (var operation in updateOperation) { WriteUpdateOperation(operation); } } private class UpdateOperation { public UpdateOperation(long ID,bool delete,params string[] changes) { this.ID = ID; this.Delete = delete; this.Changes = new List<string>(changes ?? Enumerable.Empty<string>()); } public bool Delete { get; set; } public long ID { get; private set; } public IList<string> Changes { get; private set; } public overrIDe string ToString() { var stringBuilder = new StringBuilder("{UpdateOperation "); stringBuilder.AppendFormat("ID: {0},Delete: {1},Changes: [",this.ID,this.Delete); if (this.Changes.Count > 0) { stringBuilder.Append(this.Changes.First()); foreach (var change in this.Changes.Skip(1)) { stringBuilder.AppendFormat(",{0}",change); } } stringBuilder.Append("]}"); return stringBuilder.ToString(); } }} }
任何人都可以帮助我正确的查询?
谢谢
更新08.03.13(JerKimball的建议)
以下几行是对JerKimball代码的小改动/补充,用于打印结果:
using(query.Subscribe(Print)){ Console.Readline(); producer.dispose(); } 使用以下打印方法:
private static voID Print(IObservable<IList<Operation>> operations){ operations.Subscribe(Print);}private static voID Print(IList<Operation> operations){ var stringBuilder = new StringBuilder("["); if (operations.Count > 0) { stringBuilder.Append(operations.First()); foreach (var item in operations.Skip(1)) { stringBuilder.AppendFormat(",item); } } stringBuilder.Append("]"); Console.Writeline(stringBuilder); } 以及要 *** 作的字符串:
public overrIDe string ToString(){ return string.Format("{0}:{1}",this.Type,this.Seq);} 订单保留,但是:
>我不确定在另一个订阅中订阅:它是否正确(这是我很久以前的一个问题,我从来都不清楚)?
>我在每个列表上总是不超过两个元素(即使流生成两个以上具有相同类型的连续值)
首先,让我们定义一个扩展方法,该方法将根据键“折叠”项目列表,同时保留顺序:
public static class Ext{ public static IEnumerable<List<T>> ToRuns<T,TKey>( this IEnumerable<T> source,Func<T,TKey> keySelector) { using (var enumerator = source.GetEnumerator()) { if (!enumerator.MoveNext()) yIEld break; var currentSet = new List<T>(); // inspect the first item var lastKey = keySelector(enumerator.Current); currentSet.Add(enumerator.Current); while (enumerator.MoveNext()) { var newKey = keySelector(enumerator.Current); if (!Equals(newKey,lastKey)) { // A difference == new run; return what we've got thus far yIEld return currentSet; lastKey = newKey; currentSet = new List<T>(); } currentSet.Add(enumerator.Current); } // Return the last run. yIEld return currentSet; // and clean up currentSet = new List<T>(); lastKey = default(TKey); } }} 相当简单 – 给定IEnumerable< T>,将返回List< List< T>>每个子列表将具有相同的密钥.
现在,喂它并使用它:
var rnd = new Random();var fakeSource = new Subject<Operation>();var producer = Observable .Interval(TimeSpan.FromMilliseconds(1000)) .Subscribe(i => { var op = new Operation(); op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete"; fakeSource.OnNext(op); }); var singleSource = fakeSource .Publish().RefCount();var query = singleSource // change this value to alter your "look at" time window .Buffer(TimeSpan.FromSeconds(5)) .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0));using(query.Subscribe(batch => { foreach(var item in batch) { Console.Writeline("{0}({1})",item.First().Type,item.Count); }})){ Console.Readline(); producer.dispose(); } 给它一个旋转 – 这是我在典型的运行中看到的:
insert(4)delete(2)insert(1)delete(1)insert(1)insert(1)delete(1)insert(1)delete(2)delete(2)insert(2)delete(1)insert(1)delete(2)insert(2)总结
以上是内存溢出为你收集整理的c# – Rx groupby直到条件改变全部内容,希望文章能够帮你解决c# – Rx groupby直到条件改变所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)