Git Product home page Git Product logo

astarte's Introduction

Astarte

Build Status codecov license language

Welcome to Astarte !

Example

  • WorldCount:
BatchContext mppContext = BatchContext.builder()
    .local(2)
    .getOrCreate();

DataSet<String> ds = mppContext.textFile("/tmp/.../README.md");
DataSet<String> worlds = ds.flatMap(input -> input.toLowerCase().split(" "))
    .filter(x -> !"".equals(x.trim()));

KvDataSet<String, Long> worldCounts = worlds.kvDataSet(x -> Tuple2.of(x, 1L))
    .reduceByKey((x, y) -> x + y);

worldCounts.collect()
    .forEach(x -> System.out.println(x.f1() + "," + x.f2()));
  • PageRank
BatchContext mppContext = BatchContext.builder()
        .local(2)
        .getOrCreate();
int iters = 4;
String sparkHome = System.getenv("SPARK_HOME");

DataSet<String> lines = mppContext.textFile(sparkHome + "/data/mllib/pagerank_data.txt");

KvDataSet<String, Iterable<String>> links = lines.kvDataSet(s -> {
    String[] parts = s.split("\\s+");
    return new Tuple2<>(parts[0], parts[1]);
}).distinct().groupByKey().cache();

KvDataSet<String, Double> ranks = links.mapValues(v -> 1.0);
for (int i = 1; i <= iters; i++) {
    DataSet<Tuple2<String, Double>> contribs = links.join(ranks).values().flatMapIterator(it -> {
        Collection<String> urls = (Collection<String>) it.f1();
        Double rank = it.f2();

        long size = urls.size();
        return urls.stream().map(url -> new Tuple2<>(url, rank / size)).iterator();
    });

    ranks = KvDataSet.toKvDataSet(contribs).reduceByKey((x, y) -> x + y).mapValues(x -> 0.15 + 0.85 * x);
}

List<Tuple2<String, Double>> output = ranks.collect();
output.forEach(tup -> System.out.println(String.format("%s has rank:  %s .", tup.f1(), tup.f2())));

astarte's People

Contributors

dependabot[bot] avatar gonborn avatar harbby avatar jfanzhao avatar

Stargazers

 avatar

Watchers

 avatar  avatar

astarte's Issues

Add more type encoder

背景

序列化器的核心功能已经合入主分支(详见: #4)
现计划进一步来引入类型系统.

现阶段先将其他缺失的类型进行补充:

1. 基础类型:

  • 其他java原始类型(byte,short,char,float,TimeStamp.....)
  • 原始类型的数组类型(byte[], short[], float[].....)

2. 基础复合类型:

  • Map<K,V>类型
  • 结构体类型: Tuple[3-10-22]

3. JavaBean类型

  • 用户Java对象类型(对应spark scala的case class类型)

参考

    private static class LongEncoder
            implements Encoder<Long>
    {
        @Override
        public void encoder(Long value, DataOutput output)
                throws IOException
        {
            output.writeLong(value);
        }

        @Override
        public Long decoder(DataInput input)
                throws IOException
        {
            return input.readLong();
        }
    }

详见: Encoders

Add record encoder

添加 数据行编码器/解码器功能

背景:

分布式计算系统中经常会通过shuffle在节点间传输大量数据.IO瓶颈(磁盘/网络)也shuffle环节最大的挑战.
通过减少传输字节数来提升IO是非常有效的手段.常见做法是高效的序列化器+压缩来提升IO性能.

特点:

  • 高效的序列化器(编码器/解码器)+压缩可以显著减少字节数,大幅提升IO.
  • 可用于字节存储.字节存储相比对象存储更加节省空间,且能显著提高Cpu Cache命中率.字节存储例子: Flink的BinaryRow,Spark的UnsafeRow等.

设计

所有编码器解码器都继承自Encoder<E>且由Encoders类进行引用:
接口设计如下:

public interface Encoder<E>
        extends Serializable
{
    public void encoder(E value, DataOutput output)
            throws IOException;

    public E decoder(DataInput input)
            throws IOException;
}

兼容性:

  • 该patch不会引入破坏性Api变化.
  • 阶段实验性加入 DataSet.encoder(Encoder) 方法.该方法会在类型推导功能(# ??? )完成后移除
  • 该功能会对netty网络传输后端造成破坏性改变,需要在后端加入相应解码器

效果:

  • 该功能将允许用户添加设置Record的序列化器(Encoder).
  • 该序列化器将在shuffleMap write和shufflerReduce reader时起作用,将显著降低传输的字节数.
  • 且在小Record测试下有10倍数的提升(测试参考: ....)

Add map filter dataset code generation

该patch将引入code generation。旨在提高业务代码密度。
以这个例子为例:
dataSource.map(x->x+1).map(x->x * 3).filter(x % 2 ==0).count()
将生成如下物理执行计划:

...

通过code generation后将优化为如下伪代码:

while(dataSource.hashNext()) {
     input = dataSource.nextRow()
     input = input + 1
     input = input * 3
     if (input % 2 ==0)  {
            collect.collect(input)
      }
}

现代框架普遍存在大框架小业务的尴尬(俗称太重)。在提高密度的同时也面向过程的开始,将所有OO抽象一层层剥离。
理想的情况下。所有处理数据的业务代码将全部压缩在一个循环体中。这也是人在面向过程手写代码的形式。

高密度代码在性能上具有非常多的优势,这里暂略。

Add sort merge shuffle

该patch将sort shuffle引入astarte。合并后将会在批计算中移除HashShuffle。

引入sort shuffle后的好处:

  • pipiline管道运行,不再打断流水线和产生溢写IO
    重点影响以下3中场景:
  • pipiline merge join.
  • pipiline map combine.
  • pipeline groupby reduce.

when:

map端进行map sort. reduce端进行merge sort。

Io:

采用spark的做法: shuffle service不对map file进行合并读取。直接转发压缩文件。reduce端进行合并。
net channel = MapTaskNum * reduceNumber(hash shuffle =reduceNumber * executorNum )

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.