Git Product home page Git Product logo

fastify-arrow's Introduction

fastify-arrow

A Fastify plugin for sending and receiving columnar tables with Apache Arrow as optimized, zero-copy binary streams.

This module decorates the fastify Request with a recordBatches() method that returns an IxJS AsyncIterable of Arrow RecordBatchReaders.

Each inner RecordBatchReader is an AsyncIterableIterator<RecordBatch>, leading to the following signature:

Request.prototype.recordBatches = () => AsyncIterable<AsyncIterable<RecordBatch>>;

AsyncIterable is native in JS via the [Symbol.asyncIterator]() and for await...of protocols. You can create an Ix.AsyncIterable from a NodeJS.ReadableStream with AsyncIterable.fromNodeStream(). You can also pipe() an AsyncIterable to a NodeJS.WritableStream to more easily transition between the functional and imperative APIs available in node and Arrow.

Arrow RecordBatches are full-width, length-wise slices of a Table. To illustrate, the following table contains three RecordBatches, and each RecordBatch has three rows:

"row_id" |      "utf8: Utf8" |  "floats: Float32"    ___
       0 |          "sh679x" |  6.308125972747803       |
       1 |    "u9joo443zl38" | 12.003445625305176       | <-- RecordBatch 1
       2 |  "4b2f5pcyp_nisb" |  14.00214672088623    ___|
       3 |        "rfmuc50d" |  8.512785911560059       |
       4 |  "1u7ygm51_2cvye" | 14.949934959411621       | <-- RecordBatch 2
       5 |        "xffgrp9x" |  8.687625885009766    ___|
       6 |   "9vhc_g3_lqx4v" | 13.841902732849121       |
       7 | "4bxi6ioh8cssq12" | 15.428414344787598       | <-- RecordBatch 3
       8 |         "zjcxb2s" | 7.1155924797058105    ___|

You can generate a table similar to the above by installing the dependencies, then executing the following command from the repository TLD:

$ node test/util.js | npx arrow2csv
  "row_id" |     "str: Utf8" | "num: Float32"
         1 |         "f_sry" |              1
         2 |         "ogbwi" |              2
         3 |         "ny5l_" |              3
         4 |         "hi6r5" |              1
         5 |         "_5_zf" |              2
         6 |         "di9mu" |              3
         7 |         "gbswg" |              1
         8 |         "alm8f" |              2
         9 |         "qrzah" |              3

This module also decorates fastify's Reply with a convenient stream() method, returning a pass-through stream hooked up to the http ServerResponse.

Send Arrow RecordBatch streams

const Fastify = require('fastify');
const arrowPlugin = require('fastify-arrow');
const fastify = Fastify().register(require('fastify-arrow'));
const {
    tableFromIPC, vectorFromArray,
    Utf8Vector, FloatVector,
    RecordBatchStreamWriter,
} = require('apache-arrow');

fastify.get(`/data`, (request, reply) => {
    RecordBatchStreamWriter
        .writeAll(demoData())
        .pipe(reply.stream());
});

(async () => {
    const res = await fastfiy.inject({
        url: '/data', method: `GET`, headers: {
            'accepts': `application/octet-stream`
        }
    });
    console.log(tableFromIPC(res.body)); // Table<{ strings: Utf8, floats: Float32 }>
})();

function* demoData(batchLen = 10, numBatches = 5) {
    const rand = Math.random.bind(Math);
    const randstr = ((randomatic, opts) =>
        (len) => randomatic('?', len, opts)
    )(require('randomatic'), { chars: `abcdefghijklmnopqrstuvwxyz0123456789_` });

    let schema;
    for (let i = -1; ++i < numBatches;) {
        const str = new Array(batchLen);
        const num = new Float32Array(batchLen);
        (() => {
            for (let i = -1; ++i < batchLen; str[i] = randstr((num[i] = rand() * (2 ** 4)) | 0));
        })();
        const table = tableFromArrays({
            strings: vectorFromArray(str),
            floats: vectorFromArray(num)
        });
        yield* table.batches;
    }
}

Receive Arrow RecordBatch streams

const { AsyncIterable } = require('ix');
const { createWriteStream } = require('fs');
const eos = require('util').promisify(require('stream').finished);

fastify.post(`/update`, (request, reply) => {
    request.recordBatches()
        .map((recordBatches) => eos(recordBatches
            .pipe(createWriteStream('./new_data.arrow'))))
        .map(() => 'ok').catch(() => AsyncIterable.of('fail'))
        .pipe(reply.type('application/octet-stream').stream());
});

(async () => {
    const res = await fastfiy.inject({
        url: '/update', method: `POST`, headers: {
            'accepts':  `text/plain; charset=utf-8`,
            'content-type':  `application/octet-stream`
        },
        payload: RecordBatchStreamWriter.writeAll(demoData()).toNodeStream()
    });
    console.log(res.body); // 'ok' | 'fail'
})();

Send and receive Arrow RecordBatch streams

fastify.post(`/avg_floats`, (request, reply) => {
    request.recordBatches()
        .map(async (reader) => averageFloatCols(new Table(await reader.readAll())))
        .pipe(RecordBatchStreamWriter.throughNode({ autoDestroy: false }))
        .pipe(reply.type('application/octet-stream').stream());
});

(async () => {
    const writer = RecordBatchStreamWriter.writeAll(demoData());
    const averages = await fastfiy.inject({
        url: '/avg_floats', method: `POST`,
        payload: writer.toNodeStream(),
        headers: {
            'accepts':  `application/octet-stream`,
            'content-type':  `application/octet-stream`
        },
    });
    console.log(tableFromIPC(res.body)); // Table<{ floats_avg: Float32 }>
})();

function averageFloatCols(table) {
    const fields = table.schema.fields.filter(DataType.isFloat);
    const names = fields.map(({ name }) => `${name}_avg`);
    const averages = fields
        .map((f) => table.getChild(f.name))
        .map((xs) => Iterable.from(xs).average())
        .map((avg) => vectorFromArray(new Float32Array([avg])))
    return tableFromArrays(names.reduce((xs, name, i) => ({
      ...xs, [name]: averages[i]
    }), {}));
}

fastify-arrow's People

Contributors

lmeyerov avatar matekdev avatar trxcllnt avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

fastify-arrow's Issues

Add descriptive error handling to `tableFromIPC`?

Hiya! If you pass something other than perfect arrow buffer data into tableFromIPC, a catastrophic and non-descriptive error falls out. How do you feel about me writing a PR that adds a few simple checks and actually says what is in the buffer if it can't be read properly with tableFromIPC, such as if you pass a 404 JSON response into it.

I think it is appropriate in this case to handle the error in tableFromIPC since IPC implies that we're receiving the argument directly from IPC. If we capture the result, introspect it to see if the request worked, and then pass it into tableFromIPC then it isn't really IPC is it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.