1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
function streamQueue(readable, fn, { concurrency = 1, collectResults = false, eventName = 'data' } = {}) { let runCount = 0, index = 0, isOver = false; const results = []; return new Promise((resolve, reject) => { const checkFinish = () => { if (isOver && runCount === 0) resolve(collectResults ? results : index); };
readable.on(eventName, async data => { let myIdx = index++; runCount++; if (runCount >= concurrency) readable.pause(); try { let result = await fn(data); if (collectResults) results[myIdx] = result; } catch (e) { return reject(e); } runCount--; checkFinish(); if (runCount < concurrency) readable.resume(); }); readable.on('close', () => { isOver = true; checkFinish(); }); readable.on('end', () => { isOver = true; checkFinish(); }); readable.on('error', reject); }); }
|