multithreading - Rx .NET: Filter observable until task is done -


i´m learning rx .net , colleague send me simple example start there ugly don't like.

the code:

using system; using system.reactive.linq; using system.reactive.threading.tasks; using system.threading.tasks; using system.windows.forms; using system.collections.generic;  namespace windowsformsapplication1 {     public partial class form1 : form     {         public iobservable<content> contentstream;         public static bool isrunning = false;          public form1()         {              initializecomponent();              contentstream = observable.fromeventpattern<scrolleventargs>(datagridview1, "scroll")  // create scroll event observable                 .where(e => (datagridview1.rows.count - e.eventargs.newvalue < 50 && !isrunning)) //discart event if scroll not down enough                 //or retrieving items (isrunning)                 .select(e => { isrunning = true; return 100; }) //transform 100--100--100--> stream, discart next events until finish                  .scan((x, y) => x + y) //get item index accumulating stream items                 .startwith(0) //start 0 before event gets triggered                 .selectmany(i => getcontent(i).toobservable());//create stream result of async function , merge them 1 stream              contentstream.subscribe(c => invokeupdatelist(c)); //just update control every time item in contentstream          }          async private task<content> getcontent(int index)         {              await task.delay(1000);//request web api...             return new content(index);//mock response         }          private void invokeupdatelist(content c)         {             datagridview1.invoke((methodinvoker)delegate             {                 updatelist(c);             });         }          private void updatelist(content c)         {             foreach (var item in c.pagecontent)             {                 datagridview1.rows.add(item);             }             isrunning = false; //unlocks event filter         }      }      public class content     {         public list<string> pagecontent = new list<string>();         public const string content_template = "this item {0}.";         public content()         {         }         public content(int index)         {              (int = index; < index + 100; i++)             {                 pagecontent.add(string.format(content_template, i));             }          }     } } 

what don't isrunning filter. there better way discart event in stream until control updated?

although @shlomo approach seems right not start populating on load:

 var index = new behaviorsubject<int>(0);        var source = observable.fromeventpattern<scrolleventargs>(datagridview2, "scroll")           .where(e => datagridview2.rows.count - e.eventargs.newvalue < 50)           .select(_ => unit.default)           .startwith(unit.default)           .do(i => console.writeline("event triggered"));        var fetchstream = source           .withlatestfrom(index, (u, i) => new {unit = u,index = } )           .do(o => console.writeline("merge result" + o.unit + o.index ))           .distinctuntilchanged()           .do(o => console.writeline("merge changed" + o.unit + o.index))           .selectmany(i => getcontent(i.index).toobservable());         var contentstream = fetchstream.withlatestfrom(index, (c, i) => new { content = c, index = })           .observeon(datagridview2)           .subscribe(a =>           {             updategrid(a.content);             index.onnext(a.index + 100);           }); 

i can see "event triggered" in output log seems first source element (startwith(unit.default)) lost once reach withlatestfrom.

this looks sort of pagination auto-scroll implementation? conceptually, split observable:

var index = new behaviorsubject<int>(0);  var source = observable.fromeventpattern<scrolleventargs>(datagridview1, "scroll")      .where(e => datagridview1.rows.count - e.eventargs.newvalue < 50)     .select(_ => unit.default)     .startwith(unit.default);  var fetchstream = source     .withlatestfrom(index, (_, i) => i)     .distinctuntilchanged()     .selectmany(i => getcontent(i).toobservable()); 

so source series of units, empty notifications user wants initiate list update. index represents next index downloaded. fetchstream merges source index make sure there's 1 request given index, initiates fetch.

now have stream of requests distinct, need subscribe , update ui , index.

var contentstream =     fetchstream .withlatestfrom(index, (c, i) => new { content = c, index = })     .observeon(datagridview1)     .subscribe(a =>         {             updatelist(a.content);             index.onnext(a.index + 100);         }); 

note observeon(datagridview1) accomplishes same thing invokeupdatelist method in cleaner form (requires nuget system.reactive.windows.forms), eliminate method.

all of can go in constructor, can hide state changes in there.


Comments

Popular posts from this blog

Spring Boot + JPA + Hibernate: Unable to locate persister -

go - Golang: panic: runtime error: invalid memory address or nil pointer dereference using bufio.Scanner -

c - double free or corruption (fasttop) -