Git Product home page Git Product logo

Comments (3)

jpambrun avatar jpambrun commented on August 21, 2024

In the end, I think I found something that works-ish by handling all error in the customSource and trying not to ever bubble up the error throught the stream.

I am still not sure this is the best story around error handling. If I can't retry enough and destroy the passthrough stream it will terminate the node process.

import * as unzipper from "unzipper";
import { PassThrough } from 'node:stream';
import { Readable } from 'node:stream';
import pRetry from "p-retry";

const TEST_ERROR = process.env.TEST_ERROR || false;
const URL = 'http://127.0.0.1:8080/modules.zip'

const customSource = {
    stream: function (offset, length) {
        const pass = new PassThrough();
        pass.on("error", (err) => console.log('pass error'))
        let bytesWritten = 0;
        pRetry(async () => {
            const res = await fetch(URL, { headers: { Range: `bytes=${offset + bytesWritten}-${length ? offset + length - 1 : ''}` } })
            if (!res.ok) throw new Error(`Failed to fetch, status ${res.status}`);
            if (TEST_ERROR && Math.random() > 0.99) throw new Error(`Random fetch failure`);

            const body = await res.body;
            const stream = Readable.fromWeb(body);
            await new Promise((resolve, reject) => {
                stream.on("error", (err) => reject(err))
                stream.on("end", () => {
                    pass.end();
                    resolve();
                })
                stream.on("data", (data) => {
                    if (TEST_ERROR && Math.random() > 0.99) stream.destroy(new Error('Random streaming failure'));
                    pass.write(data);
                    bytesWritten += data.length;
                })
            })
        }, { retries: 5, onFailedAttempt: (err) => console.log(err) })
        return pass
    },
    size: async function () {
        return pRetry(async () => {
            const res = await fetch(URL, { method: 'HEAD' });
            return parseInt(res.headers.get('content-length'));
        }, { retries: 5, onFailedAttempt: (err) => console.log(err) })
    }
}

const directory = await unzipper.Open.custom(customSource);
const decompressedFiles = []
for await (const file of directory.files.filter(f => f.type === 'File')) {
    try {
        // console.log(file.path)
        const fileStream = await file.stream();
        //just consume the stream to decompress the file
        await new Promise((resolve, reject) => {
            fileStream.on('data', (chunk) => { /* noop*/ });
            fileStream.on('end', () => resolve());
            fileStream.on('error', (err) => {
                console.log('Stream error:', err); // don't see that
                reject(err)
            });
        });
        decompressedFiles.push(file.filename);
    } catch (error) {
        console.log('for await try/catch'); //don't see that
    }
}

console.log(decompressedFiles.length)

// zip -r modules.zip node_modules/
// npx http-server
// TEST_ERROR=true node index.mjs

from node-unzipper.

ZJONSSON avatar ZJONSSON commented on August 21, 2024

I don't know if the retry mechanism should be baked into the unzipper. Some projects may want to retry, and the way they retry could be very custom (i.e., number of retries, exponential backoff, etc.). Anything "can break," not just unzipper, so it seems to me that the retry methodology is outside the scope of this library. However, if you want to create your own custom adapter with retry, the approach above makes sense, but here are some comments:

stream objects are not promises, so doing await file.stream() doesn't wait for the stream to complete. You have to consume the stream to get to the end. A simple way to do that is to do await file.buffer() instead (this method consumes all the stream's contents into a buffer, and in this case, you just throw it away instead of assigning it to a variable). Also if you await on file.buffer(), consuming the entire stream, hen you most likely don't need the promise with event emitters below.

also, FYI: adding .on('data',... is an anti-pattern since it switches the stream into flowing mode (see here) with no backpressure management (i.e. you can blow up memory if unzipping is slower than fetching the data for large files)

A better way to introduce mocked errors in the stream is to use a transform, which you pipe directly into pass instead of doing the pass. Write on the chunks. An example would be something like:

const stream = Readable.fromWeb(body)
  .pipe(new Transform({
    transform(chunk, encoding, callback) {
      if (TEST_ERROR && Math.random() > 0.99) stream.destroy(new Error('Random streaming failure'));
      bytesWritten += chunk.length;
     callback(null, chunk);
    },
}); 

from node-unzipper.

jpambrun avatar jpambrun commented on August 21, 2024

Thanks for the very helpful and generous anwser. I realy appreciate. Your comments are absolutely on point.

My challenge is that I will be streaming from s3 from large-ish zip (~500MB) each with thousands of files (~300KB). Tens of thoudands of zips spanning 100+ terrabytes. On the plus side, with this library I can do it at 1 gbps, from my laptop from home. Once deployed on ec2 it should reach 3-4gbps on moderatly size VM. This is pretty much amazing. The downside is that I do 1000s of request per second and that inevitably some will fail. In my testing I get one hiccup at least every 30s. When that happens I want to retry only one entry, not the whole zip file. Without the trick above, I can't find a way to catch the error and it takes down the whole node process.

I realize now that my proposal is making the stream in flowing mode wich is suboptimal. However, this made it easy the keep using the same returned passthrough stream. On a streaming error where I already have pushed some data I can resume where I felt off by adjusting the retried byte range request start position. I am struggling to see how I would do that pausing mode. I would unpipe the broken stream and re-pipe a new one later on retry? woulnd't the error have already propagated and I would be in the same situation?

from node-unzipper.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.