Git Product home page Git Product logo

fastify-arrow's Introduction

fastify-arrow

A Fastify plugin for sending and receiving columnar tables with Apache Arrow as optimized, zero-copy binary streams.

This module decorates the fastify Request with a recordBatches() method that returns an IxJS AsyncIterable of Arrow RecordBatchReaders.

Each inner RecordBatchReader is an AsyncIterableIterator<RecordBatch>, leading to the following signature:

Request.prototype.recordBatches = () => AsyncIterable<AsyncIterable<RecordBatch>>;

AsyncIterable is native in JS via the [Symbol.asyncIterator]() and for await...of protocols. You can create an Ix.AsyncIterable from a NodeJS.ReadableStream with AsyncIterable.fromNodeStream(). You can also pipe() an AsyncIterable to a NodeJS.WritableStream to more easily transition between the functional and imperative APIs available in node and Arrow.

Arrow RecordBatches are full-width, length-wise slices of a Table. To illustrate, the following table contains three RecordBatches, and each RecordBatch has three rows:

"row_id" |      "utf8: Utf8" |  "floats: Float32"    ___
       0 |          "sh679x" |  6.308125972747803       |
       1 |    "u9joo443zl38" | 12.003445625305176       | <-- RecordBatch 1
       2 |  "4b2f5pcyp_nisb" |  14.00214672088623    ___|
       3 |        "rfmuc50d" |  8.512785911560059       |
       4 |  "1u7ygm51_2cvye" | 14.949934959411621       | <-- RecordBatch 2
       5 |        "xffgrp9x" |  8.687625885009766    ___|
       6 |   "9vhc_g3_lqx4v" | 13.841902732849121       |
       7 | "4bxi6ioh8cssq12" | 15.428414344787598       | <-- RecordBatch 3
       8 |         "zjcxb2s" | 7.1155924797058105    ___|

You can generate a table similar to the above by installing the dependencies, then executing the following command from the repository TLD:

$ node test/util.js | npx arrow2csv

This module also decorates fastify's Reply with a convenient stream() method, returning a pass-through stream hooked up to the http ServerResponse.

Send Arrow RecordBatch streams

const Fastify = require('fastify');
const arrowPlugin = require('fastify-arrow');
const fastify = Fastify().register(require('fastify-arrow'));
const {
    Schema, DataType,
    Table, RecordBatch,
    Utf8Vector, FloatVector,
    RecordBatchStreamWriter,
} = require('apache-arrow');

fastify.get(`/data`, (request, reply) => {
    RecordBatchStreamWriter
        .writeAll(demoData())
        .pipe(reply.stream());
});

(async () => {
    const res = await fastfiy.inject({
        url: '/data', method: `GET`, headers: {
            'accepts': `application/octet-stream`
        }
    });
    console.log(Table.from(res.body)); // Table<{ strings: Utf8, floats: Float32 }>
})();

function* demoData(batchLen = 10, numBatches = 5) {
    const rand = Math.random.bind(Math);
    const randstr = ((randomatic, opts) =>
        (len) => randomatic('?', len, opts)
    )(require('randomatic'), { chars: `abcdefghijklmnopqrstuvwxyz0123456789_` });

    let schema;
    for (let i = -1; ++i < numBatches;) {
        const str = new Array(batchLen);
        const num = new Float32Array(batchLen);
        (() => {
            for (let i = -1; ++i < batchLen; str[i] = randstr((num[i] = rand() * (2 ** 4)) | 0));
        })();
        const columns = [Utf8Vector.from(str), FloatVector.from(num)];
        schema || (schema = Schema.from(columns, ['strings', 'floats']));
        yield new RecordBatch(schema, batchLen, columns);
    }
}

Receive Arrow RecordBatch streams

const { AsyncIterable } = require('ix');
const { createWriteStream } = require('fs');
const eos = require('util').promisify(require('stream').finished);

fastify.post(`/update`, (request, reply) => {
    request.recordBatches()
        .map((recordBatches) => eos(recordBatches
            .pipe(createWriteStream('./new_data.arrow'))))
        .map(() => 'ok').catch(() => AsyncIterable.of('fail'))
        .pipe(reply.type('application/octet-stream').stream());
});

(async () => {
    const res = await fastfiy.inject({
        url: '/update', method: `POST`, headers: {
            'accepts':  `text/plain; charset=utf-8`,
            'content-type':  `application/octet-stream`
        },
        payload: RecordBatchStreamWriter.writeAll(demoData()).toNodeStream()
    });
    console.log(res.body); // 'ok' | 'fail'
})();

Send and receive Arrow RecordBatch streams

fastify.post(`/avg_floats`, (request, reply) => {
    request.recordBatches()
        .map((recordBatches) => averageFloatCols(Table.from(recordBatches)))
        .pipe(RecordBatchStreamWriter.throughNode({ autoDestroy: false }))
        .pipe(reply.type('application/octet-stream').stream());
});

(async () => {
    const writer = RecordBatchStreamWriter.writeAll(demoData());
    const averages = await fastfiy.inject({
        url: '/avg_floats', method: `POST`,
        payload: writer.toNodeStream(),
        headers: {
            'accepts':  `application/octet-stream`,
            'content-type':  `application/octet-stream`
        },
    });
    console.log(Table.from(res.body)); // Table<{ floats_avg: Float32 }>
})();

function averageFloatCols(table) {
    const fields = table.schema.fields.filter(DataType.isFloat);
    const names = fields.map(({ name }) => `${name}_avg`);
    const averages = fields
        .map((f) => table.getColumn(f.name))
        .map((xs) => Iterable.from(xs).average())
        .map((avg) => FloatVector.from(new Float32Array([avg])))
    return new Table(RecordBatch.from(averages, names));
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.