I guess here's the thing, in modern JavaScript, AsyncIterable and Observable have enough interop that you can do some pretty slick things without much fuss or any additional libraries:
Let's say you were going to read the lines of a file and kick off some RxJS-related expensive thing:
// Read some file into a stream const stream = fs.createReadStream( path.join(__dirname, "./file.txt") ); // Throw any errors that happen while waiting for it to open await once(stream, "open"); // Get a line-by-line reader (This implements Symbol.asyncIterator) const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); // This will effectively manage backpressure but provide whatever RxJS functionality you wanted: for await(const line of rl) { await getSomeObservable(line) .pipe( ...someOperatorsHere ) .forEach(doSomethingWithTheValues); }
If you wanted that as an observable, there's a lot of ways to go now, for example, you can convert back and forth from an async iterable using a library like rxjs-for-await:
import { eachValueFrom } from "rxjs-for-await"; // ... snip ... // In RxJS 7+ `defer`, et al, will convert an async iterator to an observable. const results$ = defer(async function* () { for await (const line of rl) { // here we convert the observable to an async iterable using rxjs-for-await yield* eachValueFrom(getSomeObservable(line).pipe(...someOperatorsHere)); } });
Weirdly, this almost makes a solid use case for converting an Observable to an async iterator natively without the function, we just had not added it out of fear of what people might misunderstand about how it works (buffering, etc), then again, people haven't run into too many issues with concatMap
, which has the exact same set of concerns.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4