Git Product home page Git Product logo

astarte's Issues

Add record encoder

添加 数据行编码器/解码器功能

背景:

分布式计算系统中经常会通过shuffle在节点间传输大量数据.IO瓶颈(磁盘/网络)也shuffle环节最大的挑战.
通过减少传输字节数来提升IO是非常有效的手段.常见做法是高效的序列化器+压缩来提升IO性能.

特点:

  • 高效的序列化器(编码器/解码器)+压缩可以显著减少字节数,大幅提升IO.
  • 可用于字节存储.字节存储相比对象存储更加节省空间,且能显著提高Cpu Cache命中率.字节存储例子: Flink的BinaryRow,Spark的UnsafeRow等.

设计

所有编码器解码器都继承自Encoder<E>且由Encoders类进行引用:
接口设计如下:

public interface Encoder<E>
        extends Serializable
{
    public void encoder(E value, DataOutput output)
            throws IOException;

    public E decoder(DataInput input)
            throws IOException;
}

兼容性:

  • 该patch不会引入破坏性Api变化.
  • 阶段实验性加入 DataSet.encoder(Encoder) 方法.该方法会在类型推导功能(# ??? )完成后移除
  • 该功能会对netty网络传输后端造成破坏性改变,需要在后端加入相应解码器

效果:

  • 该功能将允许用户添加设置Record的序列化器(Encoder).
  • 该序列化器将在shuffleMap write和shufflerReduce reader时起作用,将显著降低传输的字节数.
  • 且在小Record测试下有10倍数的提升(测试参考: ....)

Add sort merge shuffle

该patch将sort shuffle引入astarte。合并后将会在批计算中移除HashShuffle。

引入sort shuffle后的好处:

  • pipiline管道运行,不再打断流水线和产生溢写IO
    重点影响以下3中场景:
  • pipiline merge join.
  • pipiline map combine.
  • pipeline groupby reduce.

when:

map端进行map sort. reduce端进行merge sort。

Io:

采用spark的做法: shuffle service不对map file进行合并读取。直接转发压缩文件。reduce端进行合并。
net channel = MapTaskNum * reduceNumber(hash shuffle =reduceNumber * executorNum )

Add more type encoder

背景

序列化器的核心功能已经合入主分支(详见: #4)
现计划进一步来引入类型系统.

现阶段先将其他缺失的类型进行补充:

1. 基础类型:

  • 其他java原始类型(byte,short,char,float,TimeStamp.....)
  • 原始类型的数组类型(byte[], short[], float[].....)

2. 基础复合类型:

  • Map<K,V>类型
  • 结构体类型: Tuple[3-10-22]

3. JavaBean类型

  • 用户Java对象类型(对应spark scala的case class类型)

参考

    private static class LongEncoder
            implements Encoder<Long>
    {
        @Override
        public void encoder(Long value, DataOutput output)
                throws IOException
        {
            output.writeLong(value);
        }

        @Override
        public Long decoder(DataInput input)
                throws IOException
        {
            return input.readLong();
        }
    }

详见: Encoders

Add map filter dataset code generation

该patch将引入code generation。旨在提高业务代码密度。
以这个例子为例:
dataSource.map(x->x+1).map(x->x * 3).filter(x % 2 ==0).count()
将生成如下物理执行计划:

...

通过code generation后将优化为如下伪代码:

while(dataSource.hashNext()) {
     input = dataSource.nextRow()
     input = input + 1
     input = input * 3
     if (input % 2 ==0)  {
            collect.collect(input)
      }
}

现代框架普遍存在大框架小业务的尴尬(俗称太重)。在提高密度的同时也面向过程的开始,将所有OO抽象一层层剥离。
理想的情况下。所有处理数据的业务代码将全部压缩在一个循环体中。这也是人在面向过程手写代码的形式。

高密度代码在性能上具有非常多的优势,这里暂略。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.