Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
187 views
in Technique[技术] by (71.8m points)

javascript - RxJS Filtered/Grouped Debounce

I'm trying to use the RxJS debounce operator, but I want to customize when the emissions from the source are debounced.

By default, any emission from the source inside the debounce window will cause the previous emission to be dropped. I want only certain emissions from the source to count towards the debounce operation, based on the value of the source emission.

Let's say I have an observable of objects that look like this:

{
  priority: 'low'    //can be 'low' or 'medium' or 'high
}

I want the debounce to group by the object's priority. That means an emission will be debounced by another emission only if it has the same priority.

i.e. only 'low' emissions can debounce 'low' emissions, and only 'high' emissions can debounce 'high' emissions. If a 'medium' emission comes while a 'low' emission is waiting, it won't cause the 'low' emission to be dropped.

This means if I had a 'low' emission and a 'medium' emission in quick succession, both would go through. If I had two 'low' emissions in quick succession, only the last one would go through.

Here is what I came up with:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000

$source.pipe(
    mergeMap(value => {
       
        // We start a race of the value with a delay versus any other emissions from the source with the same priority
        return race(
            timer(delay).pipe(mapTo(value)),
            $source.pipe(
                filter(v => v.priority === value.priority),
            )
        ).pipe(
            take(1),
            // If another emission with the same priority comes before the delay, the second racer it will win the race.
            // If no emission with the same priority comes, the first racer will win.
            //
            // If the first racer wins, this equality check is satisfied and the value is passed through.
            // If the second racer wins, the equality check fails and no value is emitted. Since this is a mergeMap, this whole process will start again for that emission.
            filter(v => v === value),
        )
    })
)

I think the above is correct, but I'm wondering if maybe I'm missing something or making this way more complicated than it needs to be? The code above should function as if it were merging three separate streams of $low.pipe(debounceTime(delay)) $medium.pipe(debounceTime(delay)) and $high.pipe(debounceTime(delay)).

Thanks!!


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I think your answer works. It's also pretty clear. You do, however, have to be sure your $source is multicasted.

There's one downside I see to your approach:

You do a lot of extra computation. If you're debouncing 1000s of values per second, it might noticeably slow down depending on where it's being run.

Each streamed value can be in any number of races. Inputs from different priorities still race each other and when the next value starts its race, the previous race isn't stopped, so you can have an explosion of timers/races if a lot of values arrive at once.

It's a lot of extra timers be set and dropped. In your situation, you should need a max of three timers, each of which gets reset as a new value of the same priority arrives.

If your code isn't on the critical path that might not be a problem. Otherwise, there are other ways. The one I thought up, though, is a bit bulkier in terms of code.

Partition your streams

Here's how my brain solved this problem. I created an operator that does what RxJS partition operator does but lets you partition into more than two streams.

My approach handles multicasting internally, so the source can be whatever (hot, cold, multicasted, or not). It (internally) sets up one subject per stream and then you can use RxJS's debounceTime as usual.

There's a downside though. In your approach, you can add a new priority string willy-nilly and it should continue to work. Objects of {priority: "DucksSayQuack"} will debounce each other and not effect other priorities. This can even be done on the fly.

The partitionOn operator below needs to know the partitions ahead of time. For your described case it should have the same output and be a bit more efficient to boot.

Is this better? I dunno, it's a fun and different approach to solve the same problem. Also, I suppose there are more uses for the partitionOn operator than a partitioned debounce.

The Operator

/***
 * Create a partitioned stream for each value where a passed 
 * predicate returns true
 ***/
function partitionOn<T>(
  input$: Observable<T>, 
  predicates: ((v:T) => boolean)[]
): Observable<T>[] {
  const partitions = predicates.map(predicate => ({
    predicate,
    stream: new Subject<T>()
  }));

  input$.subscribe({
    next: (v:T) => partitions.forEach(prt => {
      if(prt.predicate(v)){
        prt.stream.next(v);
      } 
    }),
    complete: () => partitions.forEach(prt => prt.stream.complete()),
    error: err => partitions.forEach(prt => prt.stream.error(err))
  });

  return partitions.map(prt => prt.stream.asObservable());
}

Using partitionOn for priority debounce

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;

const priorityEquals = a => b => a === b?.priority;

merge(
  ...partitionOn(
    $source,
    [priorityEquals('low'),
    priorityEquals('medium'),
    priorityEquals('high')]
  ).map(s => s.pipe(
    debounceTime(1000)
  ))
);

Timestamp your stream

This approach is very similar to yours and lets you use your priority strings willy-nilly again. This has a similar issue where every value is thrown into a timer and timers aren't canceled as new values arrive.

With this approach, however, the path to canceling unnecessary timers is much more clear. You can store subscription objects alongside timestamps in the priorityTimeStamp map, and be sure to unsubscribe as new values arrive.

I really have no clue what the performance hit for this might be, I think JavaScript's event loop is pretty robust/efficient. The nice thing with this approach is that you don't pay the cost of multicasting. This is all just effectively one stream using a lookup-map to decide what gets filtered and what doesn't.

The priorityDebounceTime Operator

function priorityDebounceTime<T>(
  dbTime: number, 
  priorityStr = "priority"
): MonoTypeOperatorFunction<T> {

  return s => defer(() => {
    const priorityTimeStamp = new Map<string, number>();
    return s.pipe(
      mergeMap(v => {
        priorityTimeStamp.set(v[priorityStr], Date.now());
        return timer(dbTime).pipe(
          timestamp(),
          filter(({timestamp}) => 
            timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
          ),
          mapTo(v)
        )
      })
    )
  });

}

Using priorityDebounceTime for priority debounce

This is obviously a bit simpler:

const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;

$source.pipe(
  priorityDebounceTime(delay)
).subscribe(console.log);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

56.7k users

...