multithreading - Rx .NET: Filter observable until task is done -
i´m learning rx .net , colleague send me simple example start there ugly don't like.
the code:
using system; using system.reactive.linq; using system.reactive.threading.tasks; using system.threading.tasks; using system.windows.forms; using system.collections.generic; namespace windowsformsapplication1 { public partial class form1 : form { public iobservable<content> contentstream; public static bool isrunning = false; public form1() { initializecomponent(); contentstream = observable.fromeventpattern<scrolleventargs>(datagridview1, "scroll") // create scroll event observable .where(e => (datagridview1.rows.count - e.eventargs.newvalue < 50 && !isrunning)) //discart event if scroll not down enough //or retrieving items (isrunning) .select(e => { isrunning = true; return 100; }) //transform 100--100--100--> stream, discart next events until finish .scan((x, y) => x + y) //get item index accumulating stream items .startwith(0) //start 0 before event gets triggered .selectmany(i => getcontent(i).toobservable());//create stream result of async function , merge them 1 stream contentstream.subscribe(c => invokeupdatelist(c)); //just update control every time item in contentstream } async private task<content> getcontent(int index) { await task.delay(1000);//request web api... return new content(index);//mock response } private void invokeupdatelist(content c) { datagridview1.invoke((methodinvoker)delegate { updatelist(c); }); } private void updatelist(content c) { foreach (var item in c.pagecontent) { datagridview1.rows.add(item); } isrunning = false; //unlocks event filter } } public class content { public list<string> pagecontent = new list<string>(); public const string content_template = "this item {0}."; public content() { } public content(int index) { (int = index; < index + 100; i++) { pagecontent.add(string.format(content_template, i)); } } } }
what don't isrunning
filter. there better way discart event in stream until control updated?
although @shlomo approach seems right not start populating on load:
var index = new behaviorsubject<int>(0); var source = observable.fromeventpattern<scrolleventargs>(datagridview2, "scroll") .where(e => datagridview2.rows.count - e.eventargs.newvalue < 50) .select(_ => unit.default) .startwith(unit.default) .do(i => console.writeline("event triggered")); var fetchstream = source .withlatestfrom(index, (u, i) => new {unit = u,index = } ) .do(o => console.writeline("merge result" + o.unit + o.index )) .distinctuntilchanged() .do(o => console.writeline("merge changed" + o.unit + o.index)) .selectmany(i => getcontent(i.index).toobservable()); var contentstream = fetchstream.withlatestfrom(index, (c, i) => new { content = c, index = }) .observeon(datagridview2) .subscribe(a => { updategrid(a.content); index.onnext(a.index + 100); });
i can see "event triggered" in output log seems first source
element (startwith(unit.default)
) lost once reach withlatestfrom
.
this looks sort of pagination auto-scroll implementation? conceptually, split observable:
var index = new behaviorsubject<int>(0); var source = observable.fromeventpattern<scrolleventargs>(datagridview1, "scroll") .where(e => datagridview1.rows.count - e.eventargs.newvalue < 50) .select(_ => unit.default) .startwith(unit.default); var fetchstream = source .withlatestfrom(index, (_, i) => i) .distinctuntilchanged() .selectmany(i => getcontent(i).toobservable());
so source
series of units, empty notifications user wants initiate list update. index
represents next index downloaded. fetchstream
merges source
index
make sure there's 1 request given index, initiates fetch.
now have stream of requests distinct, need subscribe , update ui , index
.
var contentstream = fetchstream .withlatestfrom(index, (c, i) => new { content = c, index = }) .observeon(datagridview1) .subscribe(a => { updatelist(a.content); index.onnext(a.index + 100); });
note observeon(datagridview1)
accomplishes same thing invokeupdatelist
method in cleaner form (requires nuget system.reactive.windows.forms
), eliminate method.
all of can go in constructor, can hide state changes in there.
Comments
Post a Comment