rubenverborgh / asynciterator Goto Github PK
View Code? Open in Web Editor NEWAn asynchronous iterator library for advanced object pipelines in JavaScript
Home Page: https://rubenverborgh.github.io/AsyncIterator/docs/
License: Other
An asynchronous iterator library for advanced object pipelines in JavaScript
Home Page: https://rubenverborgh.github.io/AsyncIterator/docs/
License: Other
I haven't completely gotten to the bottom of this yet, but I think the problem lies in this package - so reporting in case it rings a bell for you.
When I generate a new app using Create React App, then import a module that brings in AsyncIterator (@solid/query-ldflex
, in this case), I get the following error:
./node_modules/asynciterator/asynciterator.mjs
Can't import the named export 'EventEmitter' from non EcmaScript module (only default export is available)
Named imports of native Node modules might work in Node with ES Modules enabled, but it seems to be causing problems when trying to do so from Webpack's mock.
Should they be closed? Destroyed? Should there be a special stated for iterators that errored?
Or is recovery possible?
Will be needed to prevent browser freezes (LDflex/Query-Solid#45 (comment))
Unlike other iterators, the new WrappingIterator does not destroy sources on end, nor has it a destroySource
option.
This will likely result in problems where iterators keep running internally for a while (or infinitely), even though they can be stopped.
I'm getting a behaviour that appears to be caused by a residual task scheduler ordering issue, related to #28.
I have a number of unit tests that involve async iterators returned from node-quadstore. In some cases the test can hang and timeout. The hangs are consistently reproducible but unpredictable. If some tests are skipped, the hang happens in a different place; but the tests are fully independent with respect to everything but globals.
Debugging shows that the hang is caused by an async iterator which is not emitting a readable
event despite having new items available.
The screenshot shows a debug session in which the readable
event is about to be suppressed because the async iterator already has this._readable === true
.
I have two reasons to suspect a task ordering issue:
readable
event from the source LevelIterator
(you can see that the source is readable and that the destination SimpleTransformIterator
is not yet readable).setTaskScheduler(queueMicrotask)
early in my setup fixes the issue.I will continue to investigate. I may try to create a test scenario, but it may be more fruitful for me to offer a hypothesis and a fix instead. Notifying you here in case you have any ideas! ๐
Placeholder - will try create a minimal example or debug tommorow :)
In light of the current work being done, it may be worth looking into getting some CI workflows running that do performance tests on this code in a consistent environment (perhaps something like https://stackify.com/node-js-performance-tuning/) so that there are no performance regressions in the future.
At the same time it possibly be worth setting up similar workflows in Comunica, @rubensworks do you think something like this may be appropriate for a bounty - especially since it is a task that someone can do without having too deep of a knowledge of the codebases.
I don't think it's really a bug, but nevertheless, I think it deserves some discussion.
Yesterday, I discover some edge-cases in which chained iterators would keep on running (due to an infinite iterator in the chain), even though the head of the chain was destroyed.
While destroy calls should be propagated (unless destroySource
is set to false
), this was not happening in this case.
The root cause of these problems seemed to be caused by TransformIterators with an (async) callback-based reference to another iterator, where this iterator was created externally.
They shared the following form:
const baseIterator = ...; // Some iterator that has to be created beforehand, and can not be placed in the async callback below.
const transformIt = new TransformIterator(async() => baseIterator.map(...), { autoStart: false });
In cases where transformIt
(or some other iterator in the chain from transformIt
) was destroyed before it was being read, baseIterator
would not be destroyed.
This makes sense of course, because the callback in the TransformIterator
is never invoked, which breaks the destroy call chain baseIterator
.
I've created a workaround for these cases, so the problem is solved on my end.
But I thought it would be good to raise this issue here as well. Perhaps it can help someone with debugging in the future.
Note to self for now:
See if we can remove macroTasking behavior - in particular see if we are able to do something like the following after fixing autoStart behavior
// Returns a function that asynchronously schedules a task
export function createTaskScheduler() : TaskScheduler {
if (typeof queueMicrotask === 'function')
return queueMicrotask;
const r = Promise.resolve(undefined);
return (task: Task) => { r.then(task) }
}
I suspect that this may be caused by #35 so don't worry for now.
This is a potential issue that I saw when perf testing the current PRs, but also saw in main.
If I run (on the main branch)
import { ArrayIterator, range } from '../dist/asynciterator.js'
for (let i = 0; i < 5; i++) {
const arr = new range(0, 2000000); // I've tested with both `range` and `ArrayIterator`
const iterator = arr
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item)
.map((item) => item);
const now = performance.now();
iterator.on('data', () => {}).on('end', () => {
const end = performance.now();
console.log('elapsed', end - now);
});
}
then the result is
elapsed 14108.122984007001
elapsed 14112.1332180053
elapsed 14112.153782993555
elapsed 14112.245641991496
elapsed 14112.30153799057
(as opposed to ~3000ms for one)
I suspect the issue is that the end
event is called asynchronously so has to wait for everything else to complete before we get back to it.
The new UnionIterator does not yet support autoStart: false
, which makes it impossible to take a lazy union over sources that are themselves also lazy.
The snippet at
AsyncIterator/taskscheduler.ts
Lines 13 to 21 in 668697c
setTimeout
has occurred.See use case for append
in #66 .
append
and prepend
still go through the SimpleTransformIterator
IMO the cleanest way to do this would be to add a parameter to the UnionIterator
in #65 that requires the iterators in the union to be consumed sequentially, and then for append
and prepend
to just return a union
of the current source, and the data to append
/prepend
.
@jacoscaz - do you have any thoughts on this?
The empty
iterator does not emit events if something has been await
ed after its instantiation - for instance, the following tests time out
async function promisifyEventEmitter(eventEmitter: EventEmitter): Promise<void> {
return new Promise<void>((resolve, reject) => {
eventEmitter.on('end', resolve);
eventEmitter.on('error', reject);
});
}
it('empty test awaiting promise', async () => {
const e = empty();
await Promise.resolve();
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
it('empty test undefined', async () => {
const e = empty();
await undefined;
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
whilst the following works
it('empty test undefined', async () => {
const e = empty();
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
Discovered in comunica/comunica#904
When using AsyncIterator
as a lighter alternative to streams within a project's internals, how should I approach returning something that can be .pipe()
-ed into a stream.Writable
? I've tried wrapping iterators via a custom extension of stream.Readable
but I wonder if there's a more elegant way...
Expected to address #13
In comunica/comunica#733, I discovered that some other queries started freezing (that didn't freeze before) since asynciterator version 3.0.2.
Related to #22.
Minimal reproducible example:
const arrayIt = new ArrayIterator([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], { autoStart: false });
const transformit = new TransformIterator(arrayIt, { autoStart: false });
const unionIt = new UnionIterator([transformit], { autoStart: false });
// The listeners below are never called
unionIt.on('data', () => console.log('DATA'));
unionIt.on('end', () => console.log('ENDED'));
It looks like the transform buffer is being filled partially, but its elements are never propagated upwards for some reason.
e.g. in this situation:
itA = itB.clone()
itB.getProperty('foo', callback)
itA.setProperty('foo', 123)
Should I expect the callback for getProperty on itB to fire with the value I set on itA?
While conserving the stream should be encouraged, there are still many use cases where converting the stream to an array is needed.
This is often done using arrayify-stream now, but this introduced the overhead of installing an additional package.
For developers, it would simplify things a lot if there was an (async) toArray
method available, which would fully consume the stream and produce an array.
@RubenVerborgh If this sounds good to you, I will look into implementing this for including in the next major release (for #36).
When transforming an empty lazy stream, the final stream becomes un-lazy.
This makes it so that the end-event can be emitted before any listeners can be attached.
Minimal reproducible example:
(async function() {
// Create a lazy array it
const arrayIt = new ArrayIterator([], { autoStart: false });
// Transform it twice (both lazy)
const transformedIt1 = arrayIt.transform({
map: () => 1,
autoStart: false,
});
const transformedIt2 = transformedIt1.transform({
map: () => 1,
autoStart: false,
});
console.log('WAITING...');
setTimeout(() => {
console.log('ATTACHING LISTENERS...');
// The listeners below are never called
transformedIt2.on('data', () => console.log('DATA'));
transformedIt2.on('end', () => console.log('ENDED'));
}, 1000);
})();
Since there is a native queueMicrotask
function it may be worth considering using it rather than calling .then
on a resolved promise.
This library makes heavy use of setImmediate
, which is a non-standard (at apparently never will be standard) api. See the mdn docs: https://developer.mozilla.org/en/docs/Web/API/Window/setImmediate
I feel like this should be addressed, but am also curious why this has not already come up as an issue. Perhaps all downstream users of this just use the setimmediate
npm package polyfill that MDN recommends?
setImmediate
calls with setTimeout
The thing that makes this complicated is that there is no build/bundling process in this repo yet, and this would require one, right?
setTimeout
This was the first thing that came to mind, so I forked the repo and tested it. A bunch of unit tests failed though... not sure why at this point
In order to support some more niche use-cases I am coming across I was wondering if it is worth creating an AsyncIterator-ext
package (see example below).
IMO the best way to go about this would be:
AsyncIterator
organisationAsyncIterator
repo to a repo in this organisation [Requires @RubenVerborgh due to repo permissions]@asynciterator/extension-name
schema (I am happy to take responsibility for maintenance of this new repo)@asynciterator/ext
The downside of having these in a separate package is that it is hard to use these functions as methods of an iterator (as if we just extended the existing AsyncIterator
with an AsyncIteratorExt
class then using a method like map
would return an AsyncIterator
class without any of the extension functionality).
An example of an export from that package would be the following maybeIterator
function which returns undefined on emptyIterators and an iterator otherwise (the use case for me is to terminate a forward chaining procedure in reasoning components):
import { AsyncIterator } from 'asynciterator';
/**
* @param source An AsyncIterator
* @returns The AsyncIterator if it is not empty, otherwise undefined
*/
async function maybeIterator<T>(source: AsyncIterator<T>): Promise<null | AsyncIterator<T>> {
// Avoid creating a new iterator where possible
if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && source._buffer.length > 0) {
return source
}
if (source instanceof IntegerIterator) {
return source.step >= 0 ? source.next > source.last : source.next : source.last
}
let item;
do {
if ((item = source.read()) !== null)
return source.append([item]);
await awaitReadable(source);
} while (!source.done);
return null;
}
function awaitReadable<T>(source: AsyncIterator<T>): Promise<void> {
return new Promise<void>((res, rej) => {
if (source.readable || source.done)
res();
function done() {
cleanup();
res();
}
function err() {
cleanup();
rej();
}
function cleanup() {
source.removeListener('readable', done);
source.removeListener('end', done);
source.removeListener('error', err);
}
source.on('readable', done);
source.on('end', done);
source.on('error', err);
});
}
As mentioned in #4, the property mechanism is currently not documented.
Including async generators
Per @rubensworks comment comunica/comunica#1165 (comment); it would be useful to have a debug mode for asynciterator.
The most straightforward approach would look something like
const iterators = [];
process.on('exit', () => {
if (iterators.filter(iter => !iter.done).length > 0) {
console.warn("found open iterators on process exit");
}
})
export class AsyncIterator<T> extends EventEmitter {
protected _state: number;
private _readable = false;
protected _properties?: { [name: string]: any };
protected _propertyCallbacks?: { [name: string]: [(value: any) => void] };
/** Creates a new `AsyncIterator`. */
constructor(initialState = OPEN) {
super();
this._state = initialState;
iterators.push(this)
this.on('newListener', waitForDataListener);
}
However, this approach doesn't work with jest as the exit
event is not emitted after the test suites have run.
Note that we could also offer some jest specific functionality that checks for iterators that are not closed on the afterEach
call back. The benefit there is that you can see specifically which tests are leaving iterators open in a test suite.
In comunica/comunica#561, we identified AsyncIterator as (one of) the main cause of slowdown in query performance.
While Asynciterator is very fast in Node.js, it is orders of magnitude slower in browsers such as Chrome.
The cause of this seems to be the extensive usage of setImmediate
in AsyncIterator, which is fast in Node.js, but not in browsers (due to shims).
I've started a branch on my fork to do some profiling.
This is the current output:
elements,transformers,time
1000,1, 8.535ms
1000,10, 26.684ms
1000,100, 42.072ms
10000,1, 14.416ms
10000,10, 21.166ms
10000,100, 105.672ms
100000,1, 115.589ms
100000,10, 199.156ms
100000,100, 981.854ms
elements,transformers,time
1000,1, 60.205810546875ms
1000,10, 280.35693359375ms
1000,100, 2386.37890625ms
10000,1, 365.549072265625ms
10000,10, 2032.72705078125ms
10000,100, 26157.5810546875ms
100000,1, 3524.163818359375ms
100000,10, 27374.552978515625ms
100000,100, 213922.16796875ms
These results show that both the stream size and the number of transformers have a significant impact in browsers.
I'll play around a bit to see what optimizations are possible.
It currently uses .shift
which is a O(n) operation. So it costs O(n^2) to iterator over.
Noting that the V8 engine has similar optimizations to firefox for up to 50_000 elements, we could do the following (~2x speedup).
export default class LinkedList<V> {
public length: number = 0;
private _offset = 0;
private _head: V[] = [];
private _tail: V[] = this._head;
get first() { return this._head[this._offset]; }
get last() { return this._offset > this.length ? undefined : this._tail[this._tail.length - 1]; }
get empty() { return this.length === 0; }
push(value: V) {
if (this._tail.length === 5_000) {
(this._tail as any).next = [];
this._tail = (this._tail as any).next
}
this._tail.push(value);
this.length++;
}
shift(): V | undefined {
if (this._offset === 5_000) {
// Handle when head.next is not zero
this._head = (this._head as any).next;
this._offset = 0;
}
if (this._offset > this.length + 1)
return undefined;
this.length--;
return this._head[this._offset++];
}
clear() {
this.length = 0;
this._head = this._tail = [];
}
}
console.time('LinkedListNext');
const it3 = new LinkedListNext();
for (let i = 0; i < 50_000_000; i++)
it3.push(i);
console.timeStamp('LinkedListNext')
for (let i = 0; i < 50_000_000; i++)
it3.shift(i);
console.timeEnd('LinkedListNext')
console.time('LinkedList');
const it = new LinkedList();
for (let i = 0; i < 50_000_000; i++)
it.push(i);
for (let i = 0; i < 50_000_000; i++)
it.shift(i);
console.timeEnd('LinkedList')
Results
LinkedListNext: 892.099ms
LinkedList: 2.283s
As in #86 (comment)
While we are doing a semver upgrade we may also want to migrate to use https://www.npmjs.com/package/eventemitter3 which appears to be a much less bloated; and more optimised version over the EventEmitter
compared to what I've seen in the NodeJS core.
Apologies in advance that this issue is not very concrete! I'll elaborate more once I get a chance to do some proper profiling.
After running this benchmark on the forward chaining reasoning components in Comunica I've been noticing that my laptop appears to have to deal with a large number of memory leaks when it shuts down. I suspect it is either something wrong internally with the AsyncIterator, or the way that I am using it in that case.
For basic methods like .map
and .filter
, on AsyncIterator
, using the SimpleTransformIterator
introduces uneccessary overhead via the readAndTransformSimple
operator. To improve performance in applications like Comunica it may be worth creating custom iterators with simpler read methods; for instance a synchronous MapIterator
could skip buffering entirely and just be implemented as something like
export class MapIterator<T, S> extends AsyncIterator<S> {
...
read() {
if ((item = source.read()) !== null)
return this.map(item);
}
}
@jacoscaz I suspect this is the cause of the potential slowdown you were encountering when doing a large amount of chaining.
Consider the following example:
const AsyncIterator = require('AsyncIterator');
const it = new AsyncIterator.BufferedIterator({ autoStart: false });
let counter = 0;
it._read = function(count, cb) {
console.log('Reading ' + count);
for (let i = 0; i < count; i++) {
this._push(counter++);
}
cb();
};
console.log('Initialized buffered iterator!');
const it2 = it.map((i) => i * 10);
console.log('Initialized mapped iterator!');
As the autoStart
parameter of it
is set to false
, no elements will be buffered until a data listener is attached.
However, when the iterator is transformed (for example using map
), then this autoStart
parameter is not inherited.
For this example, the following output will be produced:
Initialized buffered iterator!
Initialized mapped iterator!
Reading 4
Reading 1
Reading 1
Reading 1
Reading 1
The expected behaviour would be that no elements are produced until a data listener is attached to either it
or it2
.
In particular this logic https://github.com/LDflex/LDflex-Comunica/blob/f01ece6fa8cee720061efd5ddd23ed82921555e4/src/ComunicaEngine.ts#L158-L215 should be implemented as the [Symbol.asyncIterator]
method of AsyncIterator
A straightforward implementation should just be applying wrap
to the source if it is a Promise in the _addSource
method.
When a lazy (empty) iterator is cloned, it seems to consume the iterator eagerly, which makes it so that 'end'
events can be lost.
Minimal reproducible example:
const arrayIt = new ArrayIterator([], { autoStart: false });
const clonedIt = arrayIt.clone();
await new Promise((resolve) => setImmediate(resolve));
// The listeners below are never called
clonedIt.on('data', () => console.log('DATA'));
clonedIt.on('end', () => console.log('ENDED'));
If I install the asynciterator
package in a new node project, then methods like .on
are not shown in my type hints because TypeScript cannot guess the type of EventEmitter
which AsyncIterator
extends.
@types/node needs to be added as a dependency to resolve this (see https://stackoverflow.com/questions/45176661/how-do-i-decide-whether-types-goes-into-dependencies-or-devdependencies) - otherwise users of the AsyncIterator package need to install @types/node themselves
A function merge
on an AsyncIterator would be a great tool to e.g., merge sort different iterators:
require('async-iterator').merge([iterators], function (a, b) { return a.departureTime < b.departureTime})
Which element is emitted next is based on a boolean condition.
In comunica/comunica#144, we discussed adding a closeSourceWhenDone
(defaults to true
) to TransformIterator
. This would make the transformed iterator call close()
on its source when the transformed iterator has ended.
While this would work in practise, it may not be the cleanest solution, as Readable#close
is not part of the official interface.
Instead, Readable#destroy
is part of the interface, and can be used to achieve the same thing (and has been available for a long time).
So I'm wondering if it would be better to instead use a property destroySourceWhenDone
that calls destroy()
on the source. This would probably require AsyncIterator
to implement the destroy()
method as well, which could simply be a proxy for close()
.
I think it would be quite useful to have an overload on the filter method for the asyncIterator that is
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
This would then allow use in cases such as the following:
const termIterator: AsyncIterator<RDF.Term> = /* Some iterator */
const namedNodeIterator: AsyncIterator<RDF.NamedNode> = termIterator.filter<RDF.NamedNode>((term): term is RDF.NamedNode => term.termType === 'NamedNode')
In the UnionIterator
I don't see the necessity of buffering elements; since, unless you are taking a union over a huge number of iterators, the cost of testing all the unterminated iterators to see if they have an element available is likely it be an inexpensive operation; and thus should just happen on the read
call; rather than in _read
on a BufferedIterator. (I was just working on the Comunica reasoning components which has a lot of unions in the naive algorithm for reasoning - so I suspect it is causing some overhead there)
Not needed anymore now that arrow functions are common.
Would it be possible to extend asynciterator
to support TC-39's async iteration proposal? It is starting to be implemented by browsers and has landed as a new way to consume streams in Node.js 10.x .
I want to read all data
into a string, based on this example:
https://comunica.dev/docs/query/advanced/result_formats/#querying-in-a-javascript-app
const result = await myEngine.query(`
SELECT ?s ?p ?o WHERE {
?s ?p <http://dbpedia.org/resource/Belgium>.
?s ?p ?o
} LIMIT 100`, {
sources: ['http://fragments.dbpedia.org/2015/en'],
});
const { data } = await myEngine.resultToString(result,
'application/sparql-results+json');
data.pipe(process.stdout); // Print to standard output
How can I do that in the browser?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.