41 lines
1.2 KiB
TypeScript
41 lines
1.2 KiB
TypeScript
/*
|
|
Copyright 2023, 2024 New Vector Ltd.
|
|
|
|
SPDX-License-Identifier: AGPL-3.0-only
|
|
Please see LICENSE in the repository root for full details.
|
|
*/
|
|
|
|
import { Observable, defer, finalize, scan, startWith, tap } from "rxjs";
|
|
|
|
const nothing = Symbol("nothing");
|
|
|
|
/**
|
|
* RxJS operator that invokes a callback when the Observable is finalized,
|
|
* passing the most recently emitted value. If no value was emitted, the
|
|
* callback will not be invoked.
|
|
*/
|
|
export function finalizeValue<T>(callback: (finalValue: T) => void) {
|
|
return (source: Observable<T>): Observable<T> =>
|
|
defer(() => {
|
|
let finalValue: T | typeof nothing = nothing;
|
|
return source.pipe(
|
|
tap((value) => (finalValue = value)),
|
|
finalize(() => {
|
|
if (finalValue !== nothing) callback(finalValue);
|
|
}),
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* RxJS operator that accumulates a state from a source of events. This is like
|
|
* scan, except it emits an initial value immediately before any events arrive.
|
|
*/
|
|
export function accumulate<State, Event>(
|
|
initial: State,
|
|
update: (state: State, event: Event) => State,
|
|
) {
|
|
return (events: Observable<Event>): Observable<State> =>
|
|
events.pipe(scan(update, initial), startWith(initial));
|
|
}
|