Git Product home page Git Product logo

thrill's Introduction

Thrill

Travis-CI Status: Travis-CI Status
Jenkins Status: Jenkins Status
Appveyor Status: Appveyor Status

Thrill is an EXPERIMENTAL C++ framework for algorithmic distributed Big Data batch computations on a cluster of machines. It is currently being designed and developed as a research project at Karlsruhe Institute of Technology and is in early testing. More information on goals and mission see http://project-thrill.org.

For easy steps on Getting Started refer to the Live Documentation.

License

Thrill is free software provided under BSD 2-clause license.

If you use Thrill in an academic context or publication, please cite our paper

@InProceedings{bingmann2016thrill,
  author =       {Timo Bingmann and Michael Axtmann and Emanuel J{\"{o}}bstl and Sebastian Lamm and Huyen Chau Nguyen and Alexander Noe and Sebastian Schlag and Matthias Stumpp and Tobias Sturm and Peter Sanders},
  title =        {{Thrill}: High-Performance Algorithmic Distributed Batch Data Processing with {C++}},
  booktitle =    {IEEE International Conference on Big Data},
  year =         2016,
  pages =        {172--183},
  month =        dec,
  organization = {IEEE},
  note =         {preprint arXiv:1608.05634},
  isbn =         {978-1-4673-9005-7},
}

thrill's People

Contributors

alexnoe avatar bingmann avatar chaupow avatar cocreature avatar ctolon22 avatar kurpicz avatar lorenzhs avatar manpen avatar mstumpp avatar mullovc avatar nejmd avatar pdinklag avatar robert-williger avatar roberthangu avatar sebalamm avatar simongog avatar tim3z avatar toebbel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

thrill's Issues

ReduceToIndex yields a DIA larger than specified

The following code causes some very strange behavior in some circumstances:

#include <thrill/api/reduce_to_index.hpp>
#include <thrill/api/size.hpp>
#include <thrill/api/zip.hpp>
#include <thrill/api/zip_with_index.hpp>
#include <thrill/api/read_binary.hpp>

#include <vector>
#include <cstdint>

int main(int, char const *argv[]) {
  return thrill::Run([&](thrill::Context& context) {
    auto nodes = thrill::ReadBinary<std::vector<uint32_t>>(context, argv[1]);
    size_t node_count = nodes.Keep().Size();

    auto new_node_clusters = nodes
      .template FlatMap<std::pair<uint32_t, size_t>>([](const std::vector<uint32_t>& node, auto emit) {
        for (uint32_t neighbor : node) {
          if (neighbor % 1000 < 250) {  // breaks for [22..474]
            emit(std::make_pair(neighbor, 1));
          }
        }
      })
      .ReduceToIndex(
        [](const std::pair<uint32_t, size_t>& node) -> size_t { return node.first; },
        [](const std::pair<uint32_t, size_t>& n1, const std::pair<uint32_t, size_t>& n2) {
          return std::make_pair(n1.first, n1.second + n2.second);
        }, node_count)
      .Zip(nodes,
        [](const std::pair<uint32_t, size_t>& incoming, const std::vector<uint32_t>& node) {
          return std::make_pair(incoming, node);
        })
      .Execute();
  });
}

The algorithm is roughly similar to page rank. Nodes distribute some information along some outgoing links - represented as a vector of neighbors in this example. Then the incoming information is merged together with a ReduceToIndex call and then zipped to the original node list. One crucial difference of this algorithm is, that it does not use all edges to distribute the information/weight but only a subset. And is it seems the size of the subset determines whether this crashes or not. (also the result of the reduce operation will contain neutral elements due to this.)

The range given in the comment breaks this program in combination with this input and also only with precisely 4 workers. So something like this (but 4 on 1 also reproduces it)

THRILL_LOCAL=2 THRILL_WORKERS_PER_HOST=2 ./bug graph_100000.bin 
Thrill: using 15.667 GiB RAM total, BlockPool=5.222 GiB, workers=2.611 GiB, floating=5.222 GiB.
Thrill: running locally with 2 test hosts and 2 workers per host in a local tcp network.
Thrill: using 15.667 GiB RAM total, BlockPool=5.222 GiB, workers=2.611 GiB, floating=5.222 GiB.
Thrill: no THRILL_LOG was found, so no json log is written.
Thrill: no config file ~/.thrill found, using default disk configuration.
Thrill: disk '/tmp/thrill.tmp' is allocated, space: 1000 MiB, I/O implementation: syscall queue=0 devid=0 unlink_on_open
[host 0 worker 0 000000] Execute()  stage ReadBinary.1
[host 0 worker 0 000001] PushData() stage ReadBinary.1 with targets [Size.2]
[host 0 worker 0 000002] Execute()  stage Size.2
[host 0 worker 0 000003] PushData() stage ReadBinary.1 with targets [ReduceToIndex.4 Zip.5]
[host 0 worker 0 000004] Execute()  stage ReduceToIndex.4
[host 0 worker 0 000005] PushData() stage ReduceToIndex.4 with targets [Zip.5]
[host 0 worker 0 000006] Execute()  stage Zip.5
[host 1 worker 1 000000] Zip(): input DIAs have unequal size: [106584,100000] @ /home/eagle/dev/ma/code/prototypes/thrill_louvain/../../lib/thrill/thrill/api/zip.hpp:338
[host 1 worker 0 000000] Zip(): input DIAs have unequal size: [106584,100000] @ /home/eagle/dev/ma/code/prototypes/thrill_louvain/../../lib/thrill/thrill/api/zip.hpp:338
Aborted (core dumped)

Due to the combination of parameters necessary to reproduce the crash I strongly assume theres something going wrong with the range calculation.

Non-noexcept move constructors

I haven't checked the entire code base, but e.g. FastStrings move constructor isn't marked noexcept, which makes it rather useless in practice. Standard library containers, for example, require move constructors to be marked noexcept because of the Strong Exception Safety guarantee, and will have to use the copy constructor otherwise.

Random exceptions from data subsystem

Sometimes (at least with zipnode) it crashes with

30: terminate called after throwing an instance of 'std::runtime_error'
30: [host 0 worker 1 000000] Worker 1 threw St13runtime_error
30: [host 0 worker 1 000001]   what(): object 1 not in repository @ /home/travis/build/thrill/thrill/thrill/data/repository.hpp:85
30:   what():  object 1 not in repository @ /home/travis/build/thrill/thrill/thrill/data/repository.hpp:85
30/42 Test #30: api_zip_node_test ...................***Exception: Other  2.06 sec

Stream::CallClosedCallbacksEventually() is bound of free-race conditions

There is a race condition with registering callbacks via Stream::CallClosedCallbacksEventually(), because the caller (usual a DIANode) may no longer exist when the Stream is closed.
I think we should reconsider how Stream's log stats. These stats should go into the StatsGraph, which lives forever.

ActionFutures

Define and implement "ActionFutures".
These are Actions which deliver a Future result, hence, are not immediately calculated.
This enables better pipeling and mulitple results per data round-trip.

Implicit fallthrough in hash.hpp

GCC 7 complains about a bunch of implicit fallthroughs in hash.hpp in the switch beginning at line 137. I think the fallthroughs are intentional but I don't know or understand that code, so it would be nice if somebody could add __attribute__((fallthrough)); to fix the warnings if the fallthroughs are indeed intended.

Wrong documentation for WriteBinary/WriteLines

The docs state that in the output path a $ is replaced by the worker id and # by the file counter.
Hash is fine $ is wrong, it's @ which is used.
This can have very subtle and unexpected consequences: If you pass "foo-$#.txt the $ will just stay but the worker id will be appended to the end of the name (before extension) which will yield files like this foo-$<counter><worker>.txt which will be read in wrong order by respective read function and break order dependent data... Even though the files still look kind of right. This also happens if you remove the $.

I would suggest that the FillFilePattern function does not try to be overly smart and rather just fails very very loudly if the user input has any problems (either @ or # not given, placeholder too short to capture the entire range, placeholder order breaks lexicographic ordering).

Crash when using Union() and Collapse()

This code example triggers a crash for me (MacBook with Apple LLVM version 8.0.0 (clang-800.0.42.1) Target: x86_64-apple-darwin16.4.0) with this message:

libc++abi.dylib: terminating
Abort trap: 6
#include <thrill/api/cache.hpp>
#include <thrill/api/collapse.hpp>
#include <thrill/api/reduce_by_key.hpp>
#include <thrill/api/size.hpp>
#include <thrill/api/union.hpp>
#include <thrill/api/generate.hpp>

#include <iostream>

int main(int, char const *[]) {
  return thrill::Run([&](thrill::Context& context) {
    auto input = thrill::Generate(context, 20);
    auto other = input.Filter([](const size_t x) { return x % 3 == 0; });
    input = input
      .Filter([](const size_t x) { return x % 3 != 0; }) // this is optional
      .ReduceByKey([](const size_t x) { return x; }, [](const size_t x, const size_t y) { return x * y; }) // some DOp
      .Union(other)
      .Map([](const size_t x) { return x * 5; })
      .Collapse();
      // .Cache(); // using cache rather than collapse fixes the problem
    size_t size = input
      .ReduceByKey([](const size_t x) { return x; }, [](const size_t x, const size_t y) { return x + y; }) // some DOp
      .Size();
    std::cout << size;
  });
}

The code snippet is extracted from a more complex program and simplified as much as possible while reproducing the crash. Only the two Filter() are not required to reproduce the crash. In the original program I run an algorithm iterativley on a part of my data. Thus the Filter() call. The algorithm goes basically like this:

while some condition
  split input
  do work with first part of input
  input <- union result with the second part of input
end

That is the reason why the Collapse() in the example is necessary. Without the Collapse() the example does not crash.

The Project uses Thrill as a Submodule. And also it uses my own branch but the diff to master are only some fixes to the ssh runner script and some convinience DIA Ops which are not used here.

Thrill Output

Process 2899 launched: './test' (x86_64)
Thrill: using 8.000 GiB RAM total, BlockPool=2.667 GiB, workers=1.333 GiB, floating=2.667 GiB.
Thrill: running locally with 2 test hosts and 2 workers per host in a local tcp network.
Thrill: using 8.000 GiB RAM total, BlockPool=2.667 GiB, workers=1.333 GiB, floating=2.667 GiB.
Thrill: no THRILL_LOG was found, so no json log is written.
Thrill: no config file ~/.thrill found, using default disk configuration.
Thrill: disk '/tmp/thrill.tmp' is allocated, space: 1000 MiB, I/O implementation: syscall queue=0 devid=0 unlink_on_open
[host 0 worker 0 000000] Execute()  stage Generate.1
[host 0 worker 0 000001] PushData() stage Generate.1 with targets [ReduceByKey.4 Union.5 [Collapse.7 [ReduceByKey.8]]]
[host 0 worker 0 000002] Execute()  stage ReduceByKey.4
[host 0 worker 0 000003] PushData() stage ReduceByKey.4 with targets [Union.5 [Collapse.7 []]]
[host 0 worker 0 000004] Execute()  stage ReduceByKey.8
[host 0 worker 0 000005] PushData() stage ReduceByKey.8 with targets [Size.9]
[host 0 worker 0 000006] Execute()  stage Size.9

LLDB Output

libc++abi.dylib: terminating
Process 2899 stopped
* thread #2: tid = 0x34aed1, 0x0000000100559dd6 libsystem_kernel.dylib`__pthread_kill + 10, stop reason = signal SIGABRT
    frame #0: 0x0000000100559dd6 libsystem_kernel.dylib`__pthread_kill + 10
libsystem_kernel.dylib`__pthread_kill:
->  0x100559dd6 <+10>: jae    0x100559de0               ; <+20>
    0x100559dd8 <+12>: movq   %rax, %rdi
    0x100559ddb <+15>: jmp    0x100552cdf               ; cerror_nocancel
    0x100559de0 <+20>: retq

Stacktrace

* thread #2: tid = 0x34aed1, 0x0000000100559dd6 libsystem_kernel.dylib`__pthread_kill + 10, stop reason = signal SIGABRT
  * frame #0: 0x0000000100559dd6 libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fffbbb0e787 libsystem_pthread.dylib`pthread_kill + 90
    frame #2: 0x00007fffbb988420 libsystem_c.dylib`abort + 129
    frame #3: 0x00007fffba4e385a libc++abi.dylib`abort_message + 266
    frame #4: 0x00007fffba508b72 libc++abi.dylib`default_terminate_handler() + 46
    frame #5: 0x00007fffba505d69 libc++abi.dylib`std::__terminate(void (*)()) + 8
    frame #6: 0x00007fffba505df0 libc++abi.dylib`std::terminate() + 64
    frame #7: 0x00007fffba4d34a0 libc++.1.dylib`std::__1::thread::~thread() + 16
    frame #8: 0x00000001000417b7 test`thrill::api::ReduceNode<unsigned long, main::$_0::operator()(thrill::api::Context&) const::'lambda3'(unsigned long), main::$_0::operator()(thrill::api::Context&) const::'lambda0'(unsigned long, unsigned long), thrill::api::DefaultReduceConfig, std::__1::hash<unsigned long>, std::__1::equal_to<unsigned long>, false, false>::~ReduceNode(this=0x000000010091b200) + 87 at reduce_by_key.hpp:64
    frame #9: 0x000000010003ccd5 test`thrill::api::ReduceNode<unsigned long, main::$_0::operator()(thrill::api::Context&) const::'lambda3'(unsigned long), main::$_0::operator()(thrill::api::Context&) const::'lambda0'(unsigned long, unsigned long), thrill::api::DefaultReduceConfig, std::__1::hash<unsigned long>, std::__1::equal_to<unsigned long>, false, false>::~ReduceNode(this=0x000000010091b200) + 21 at reduce_by_key.hpp:64
    frame #10: 0x000000010003ccf9 test`thrill::api::ReduceNode<unsigned long, main::$_0::operator()(thrill::api::Context&) const::'lambda3'(unsigned long), main::$_0::operator()(thrill::api::Context&) const::'lambda0'(unsigned long, unsigned long), thrill::api::DefaultReduceConfig, std::__1::hash<unsigned long>, std::__1::equal_to<unsigned long>, false, false>::~ReduceNode(this=0x000000010091b200) + 25 at reduce_by_key.hpp:64
    frame #11: 0x0000000100007adf test`void thrill::common::DefaultCountingPtrDeleter::operator(this=0x000070000c19c890, ptr=0x000000010091b200)<thrill::api::DIABase>(thrill::api::DIABase*) const + 47 at counting_ptr.hpp:37
    frame #12: 0x00000001000078be test`thrill::common::CountingPtr<thrill::api::DIABase, thrill::common::DefaultCountingPtrDeleter>::DecReference(this=0x000070000c19c990) + 78 at counting_ptr.hpp:83
    frame #13: 0x0000000100007865 test`thrill::common::CountingPtr<thrill::api::DIABase, thrill::common::DefaultCountingPtrDeleter>::~CountingPtr(this=0x000070000c19c990) + 21 at counting_ptr.hpp:171
    frame #14: 0x0000000100004495 test`thrill::common::CountingPtr<thrill::api::DIANode<unsigned long>, thrill::common::DefaultCountingPtrDeleter>::~CountingPtr(this=0x000070000c19c990) + 21 at counting_ptr.hpp:171
    frame #15: 0x000000010002ff65 test`thrill::data::DynBlockSource::~DynBlockSource(this=0x000070000c19c990) + 21 at dyn_block_reader.hpp:49
    frame #16: 0x0000000100004095 test`thrill::api::DIA<unsigned long, thrill::api::FunctionStack<unsigned long> >::~DIA(this=0x000070000c19c990) + 21 at dia.hpp:132
    frame #17: 0x00000001000035f7 test`main::$_0::operator(this=0x00007fff5fbff818, context=0x000070000c19d750)(thrill::api::Context&) const + 471 at test.cpp:21
    frame #18: 0x000000010000340d test`void std::__1::__invoke_void_return_wrapper<void>::__call<main::$_0&, thrill::api::Context&>(main::$_0&&&, thrill::api::Context&&&) [inlined] decltype(__f=0x00007fff5fbff818, __args=0x000070000c19d750)(std::__1::forward<thrill::api::Context&>(fp0))) std::__1::__invoke<main::$_0&, thrill::api::Context&>(main::$_0&&&, thrill::api::Context&&&) + 77 at __functional_base:416
    frame #19: 0x00000001000033f0 test`void std::__1::__invoke_void_return_wrapper<void>::__call<main::$_0&, thrill::api::Context&>(__args=0x00007fff5fbff818, __args=0x000070000c19d750) + 48 at __functional_base:468
    frame #20: 0x00000001000032a4 test`std::__1::__function::__func<main::$_0, std::__1::allocator<main::$_0>, void (thrill::api::Context&)>::operator(this=0x00007fff5fbff810, __arg=0x000070000c19d750)(thrill::api::Context&) + 68 at functional:1437
    frame #21: 0x000000010004cf21 test`std::__1::function<void (thrill::api::Context&)>::operator(this=0x00007fff5fbff810, __arg=0x000070000c19d750)(thrill::api::Context&) const + 145 at functional:1817
    frame #22: 0x0000000100052477 test`thrill::api::Context::Launch(this=0x000070000c19d750, job_startpoint=0x00007fff5fbff810)> const&) + 231 at context.cpp:1106
    frame #23: 0x000000010005d3b3 test`void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(this=0x0000000100704220)> const&)::'lambda'()::operator()() const + 1123 at context.cpp:143
    frame #24: 0x000000010005cc32 test`void* std::__1::__thread_proxy<std::__1::tuple<void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()> >(void*) [inlined] decltype(__f=0x0000000100704220)> const&)::'lambda'()>(fp)(std::__1::forward<>(fp0))) std::__1::__invoke<void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()>(void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()&&) + 12 at __functional_base:416
    frame #25: 0x000000010005cc26 test`void* std::__1::__thread_proxy<std::__1::tuple<void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()> >(void*) [inlined] void std::__1::__thread_execute<void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()>(__t=0x0000000100704220)> const&)::'lambda'()>&, std::__1::__tuple_indices<>) + 22 at thread:347
    frame #26: 0x000000010005cc10 test`void* std::__1::__thread_proxy<std::__1::tuple<void thrill::api::RunLoopbackThreads<thrill::net::tcp::Group>(thrill::api::MemoryConfig const&, unsigned long, unsigned long, unsigned long, std::__1::function<void (thrill::api::Context&)> const&)::'lambda'()> >(__vp=0x0000000100704220) + 368 at thread:357
    frame #27: 0x00007fffbbb0baab libsystem_pthread.dylib`_pthread_body + 180
    frame #28: 0x00007fffbbb0b9f7 libsystem_pthread.dylib`_pthread_start + 286
    frame #29: 0x00007fffbbb0b1fd libsystem_pthread.dylib`thread_start + 13

libs3 feature detection is broken

Configuring fails if libcurl-dev (or another libs3 dependency) isn't installed or the submodule isn't there. It should fall back to not using S3 in this case.

segfault in Zip() while rebalancing

I'm having trouble with a SEGFAULT while using the Zip function.
It occurs during the rebalancing - if I pass a NoReblanceTag it works - of course only if actually no rebalancing is required.

Example code taken from my app and simplified down to the minimal problem reproduction (in the real world thing the second DIA is a different one, thats not the problem)

#include <thrill/api/print.hpp>
#include <thrill/api/read_lines.hpp>
#include <thrill/api/zip.hpp>

void doit(thrill::Context& context, const std::string &input_path) {
  auto input = thrill::ReadLines(context, input_path)

  input
    .Keep()
    .Zip(input, [](const std::string& foo, const std::string& ) { return foo; }) // SEGFAULT
    // this works
    // .Zip(thrill::api::NoRebalanceTag, input, [](const std::string& foo, const std::string& ) { return foo; })
    .Print("zipped");
}

int main(int, char const *argv[]) {
  return thrill::Run([&](thrill::Context& context) {
    doit(context, argv[1]);
  });
}

Stacktrace:

#0  thrill::data::Multiplexer::num_hosts (this=0x7f7681493498)
    at [...]/lib/thrill/thrill/data/multiplexer.hpp:86
#1  thrill::data::Multiplexer::num_workers (this=0x7f7681493498)
    at [...]/lib/thrill/thrill/data/multiplexer.hpp:96
#2  thrill::data::Stream::num_workers (this=0x7f7676a43700)
    at [...]/lib/thrill/thrill/data/stream.hpp:60
#3  thrill::data::Stream::Scatter<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > (this=<optimized out>, 
    source=..., offsets=std::vector of length 9, capacity 9 = {...}, consume=consume@entry=true)
    at [...]/lib/thrill/thrill/data/stream.hpp:101
#4  0x00000000005a913e in thrill::api::ZipNode<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, doit(thrill::api::Context&, const string&)::<lambda(const string&, const string&)>, false, true, false, 2ul>::DoScatter<0ul> (this=0x7f7676824100)
    at [...]/lib/thrill/thrill/api/zip.hpp:300
#5  thrill::api::ZipNode<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, doit(thrill::api::Context&, const string&)::<lambda(const string&, const string&)>, false, true, false, 2ul>::<lambda(auto:5)>::operator()<thrill::common::IndexSaver<0ul> > (
    __closure=<optimized out>, index=...) at [...]/lib/thrill/thrill/api/zip.hpp:362
#6  thrill::common::VariadicCallEnumerateImpl<0ul, 2ul, thrill::api::ZipNode<ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs>::MainOp() [with ValueType = std::__cxx11::basic_string<char>; ZipFunction = doit(thrill::api::Context&, const string&)::<lambda(const string&, const string&)>; bool Pad = false; bool UnequalCheck = true; bool NoRebalance = false; long unsigned int kNumInputs = 2ul]::<lambda(auto:5)> >::Call (f=<optimized out>) at [...]/lib/thrill/thrill/common/meta.hpp:191
#7  thrill::common::VariadicCallEnumerate<2ul, thrill::api::ZipNode<ValueType, ZipFunction, Pad, UnequalCheck, NoRebalance, kNumInputs>::MainOp() [with ValueType = std::__cxx11::basic_string<char>; ZipFunction = doit(thrill::api::Context&, const string&)::<lambda(const string&, const string&)>; bool Pad = false; bool UnequalCheck = true; bool NoRebalance = false; long unsigned int kNumInputs = 2ul]::<lambda(auto:5)> > (
    f=<optimized out>) at [...]/lib/thrill/thrill/common/meta.hpp:208
#8  thrill::api::ZipNode<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, doit(thrill::api::Context&, const string&)::<lambda(const string&, const string&)>, false, true, false, 2ul>::MainOp(void) (this=0x7f7676824100)
    at [...]/lib/thrill/thrill/api/zip.hpp:359
#9  0x00000000005b7f65 in thrill::api::Stage::Execute (this=0x7f767683f0a0)
    at [...]/lib/thrill/thrill/api/dia_base.cpp:143
#10 0x00000000005b654b in thrill::api::DIABase::RunScope (this=0x7f7676818600)
    at [...]/lib/thrill/thrill/api/dia_base.cpp:429
#11 0x00000000005b516d in thrill::api::DIA<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, thrill::api::FunctionStack<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >> >::Print(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::ostream&) const (this=this@entry=0x7f76783fd040, name="zipped", os=...)
    at [...]/lib/thrill/thrill/api/print.hpp:33
#12 0x00000000005aa079 in thrill::api::DIA<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, thrill::api::FunctionStack<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >> >::Print(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) const (name="zipped", this=0x7f76783fd040)
    at [...]/lib/thrill/thrill/api/print.hpp:49
#13 doit (context=..., input_path="../../../data/thrill_test/test") at [...]/src/main.cpp:13
#14 0x00000000005aa62a in <lambda(thrill::api::Context&)>::operator() (context=..., __closure=<optimized out>)
    at [...]/src/main.cpp:18
#15 std::_Function_handler<void(thrill::api::Context&), main(int, char const**)::<lambda(thrill::api::Context&)> >::_M_invoke(const std::_Any_data &, thrill::api::Context &) (__functor=..., __args#0=...) at /usr/include/c++/5/functional:1871
#16 0x00000000005ce4ab in std::function<void (thrill::api::Context&)>::operator()(thrill::api::Context&) const (this=0x7ffd32153a70, 
    __args#0=...) at /usr/include/c++/5/functional:2267
---Type <return> to continue, or q <return> to quit---
#17 0x00000000005c41ad in thrill::api::Context::Launch(std::function<void (thrill::api::Context&)> const&) (this=0x7f76783fd8d0, 
    job_startpoint=...) at [...]/lib/thrill/thrill/api/context.cpp:1083
#18 0x00000000005c4d07 in thrill::api::<lambda()>::operator()(void) const (__closure=0x7f7681423498)
    at [...]/lib/thrill/thrill/api/context.cpp:143
#19 0x00000000005c977e in std::_Bind_simple<thrill::api::RunLoopbackThreads(const thrill::api::MemoryConfig&, size_t, size_t, const std::function<void(thrill::api::Context&)>&) [with NetGroup = thrill::net::tcp::Group; size_t = long unsigned int]::<lambda()>()>::_M_invoke<>(std::_Index_tuple<>) (this=0x7f7681423498) at /usr/include/c++/5/functional:1531
#20 0x00000000005c95c8 in std::_Bind_simple<thrill::api::RunLoopbackThreads(const thrill::api::MemoryConfig&, size_t, size_t, const std::function<void(thrill::api::Context&)>&) [with NetGroup = thrill::net::tcp::Group; size_t = long unsigned int]::<lambda()>()>::operator()(void) (
    this=0x7f7681423498) at /usr/include/c++/5/functional:1520
#21 0x00000000005c9472 in std::thread::_Impl<std::_Bind_simple<thrill::api::RunLoopbackThreads(const thrill::api::MemoryConfig&, size_t, size_t, const std::function<void(thrill::api::Context&)>&) [with NetGroup = thrill::net::tcp::Group; size_t = long unsigned int]::<lambda()>()> >::_M_run(void) (this=0x7f7681423480) at /usr/include/c++/5/thread:115
#22 0x00007f76893bdc80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#23 0x00007f768a2e570a in start_thread (arg=0x7f76783fe700) at pthread_create.c:333
#24 0x00007f7688b2382d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

Add SparseFilter method to DIA

Feature request: Given a sequence P of positions (e.g as two iterators), return a new DIA which corresponds to the subarray of DIA elements at the given positions.
The runtime should depend on the length of P instead of the length of the original DIA.

Endless loop in ReduceByKeyPostPhase

This issue has two parts - the endless loop is the symptom and to get that symptom there seems to be something wrong with memory allocation tracking. Sadly right now I can't provide a minimal example (though I have a program which reproduces the bug, but it's a little to long for here but I can of course provide that if it's of any help). Luckily at least for the symptomatic part its easy to see whats going wrong:

Symptom:
When thrill::mem::memory_exceeded is true and there is work left to be done in the ReduceByKeyPostPhase Re-Reduction the loop in reduce_by_hash_post_phase.hpp:153 will not terminate. The reason for this is that as soon as the reduce table contains at least one item another insert will cause the table to spill in reduce_probing_hash_table.hpp:191 which causes another round of re-reduction and so on. In my program 8 elements of which 2 each belong together get rereduced forever because of this.

Underlying issue:
The big question of course is why on earth is thrill::mem::memory_exceeded true. In my example I process a graph with 200 nodes and 3000 edges. According to the OS the actual memory usage is 25MB or so. And I have 16GB available...

Also this happens only on my Linux machine. On my MacBook with OS X and only 8GB RAM this all works perfectly fine. I set some data watchpoints and as it seems on OS X thrill::mem::memory_exceeded gets never set to true. The reason for this might be, that in malloc_tracker.cpp the HAVE_THREAD_LOCAL variable is true on Linux and false on OS X - so on OS X the total_bytes in inc_count (line 211) will always be up to date while it might be a bit outdated on Linux.

nan/infinity in json logs break chart in html

Sometimes I encountered NaN or Infinity values in the json logs. Don't know why but thats not the problem. The problem is that they are represented as inf, nan and -nan in the json output. This breaks the chart in the html with a ReferenceError (at least in Firefox). I used this line

sed -i 's/:inf/:Infinity/g;s/:nan/:NaN/g;s/:-nan/:-NaN/g' log-*.json`

to cleanup the json and then the chart works again. Now NaN, -NaN and Infinity are not part of the JSON standard but it seems like the browser can handle them at least...

Non copyable GroupByIterator

Would it be possible to make the GroupByIterator non copyable? Because very bad stuff happens when you forget to get the iterator argument in a GroupByKey call as a reference - it will endlessly iterate on the element of the first key and fill up your RAM and hard disk...

BlockSink has pure virtual methods, but constructor is called from derived classes

I don't think that works, and clang warns about it:

In file included from ../tests/api/reduce_node_test.cpp:13:
In file included from ../thrill/api/allgather.hpp:16:
In file included from ../thrill/api/action_node.hpp:15:
In file included from ../thrill/api/dia_node.hpp:15:
In file included from ../thrill/api/dia_base.hpp:19:
In file included from ../thrill/api/context.hpp:21:
In file included from ../thrill/data/cat_stream.hpp:16:
In file included from ../thrill/data/block_queue.hpp:20:
In file included from ../thrill/data/block_writer.hpp:19:
../thrill/data/block_sink.hpp:86:11: warning: initializer for virtual base class 'thrill::data::BlockSink' of abstract class 'BoundedBlockSink' will never be used [-Wabstract-vbase-init]
        : BlockSink(block_pool),
          ^
../thrill/data/block_sink.hpp:67:18: note: unimplemented pure virtual method 'Close' in 'BoundedBlockSink'
    virtual void Close() = 0;
                 ^
../thrill/data/block_sink.hpp:70:18: note: unimplemented pure virtual method 'AppendBlock' in 'BoundedBlockSink'
    virtual void AppendBlock(const Block& b) = 0;
                 ^

I'm not sure about anything regarding inheritance in C++ though....

DC3 aka skew3

Implement dc3 suffix sorting.
Still missing:

  • still missing: distributed multiway merge

Building Thrill as a dependency fails

I was trying to build the examples from the tutorial but in it's own project with Thrill as a dependency. So I added Thrill as a git submodule in my lib folder and created a very simple CMakeLists.txt like this:

cmake_minimum_required (VERSION 2.6)
project (KMeans)
include_directories("lib/thrill")
add_subdirectory (lib/thrill)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -DNDEBUG -std=c++14 -Wall -Wextra")
add_executable(KMeans k_means.cpp)
target_link_libraries(KMeans thrill)

When building the project I get an error:

[  5%] Built target s3
[  5%] Building CXX object lib/thrill/thrill/CMakeFiles/thrill.dir/vfs/s3_file.cpp.o
[...]/thrill_tut/lib/thrill/thrill/vfs/s3_file.cpp:18:19: fatal error: libs3.h: No such file or directory
compilation terminated.
lib/thrill/thrill/CMakeFiles/thrill.dir/build.make:1358: recipe for target 'lib/thrill/thrill/CMakeFiles/thrill.dir/vfs/s3_file.cpp.o' failed
make[2]: *** [lib/thrill/thrill/CMakeFiles/thrill.dir/vfs/s3_file.cpp.o] Error 1
CMakeFiles/Makefile2:1235: recipe for target 'lib/thrill/thrill/CMakeFiles/thrill.dir/all' failed
make[1]: *** [lib/thrill/thrill/CMakeFiles/thrill.dir/all] Error 2
Makefile:83: recipe for target 'all' failed
make: *** [all] Error 2

Maybe I'm missing something terribly stupid because I suck at CMake but in any case it would be great to have an example in the tutorial of how to build and include thrill as a lib.

LTO build fails

FAILED: tests/api_sort_node_test 
: && /usr/bin/c++   -fdiagnostics-color=always -std=c++1y -g -W -Wall -Wextra -fPIC -Wpedantic -Wcast-qual -Winit-self -Wnoexcept -Woverloaded-virtual -Wredundant-decls -Ofast -DNDEBUG -march=native -flto   tests/CMakeFiles/api_sort_node_test.dir/api/sort_node_test.cpp.o  -o tests/api_sort_node_test  -rdynamic thrill/libthrill.so extlib/googletest/googletest/libgtest_main.so -ltbb -ltbbmalloc -ljemalloc -ldl extlib/googletest/googletest/libgtest.so -lpthread -Wl,-rpath,~/coding/thrill/build-lto/thrill:~/coding/thrill/build-lto/extlib/googletest/googletest && :
../thrill/io/syscall_file.hpp:50: error: undefined reference to 'VTT for thrill::io::SyscallFile'
../thrill/io/syscall_file.hpp:50: error: undefined reference to 'vtable for thrill::io::SyscallFile'
/usr/bin/ld: the vtable symbol may be undefined because the class is missing its key function
collect2: error: ld returned 1 exit status
ninja: build stopped: subcommand failed.

DiaBase#PushData() does not take memory usage of unconsmed DIAs into account

DiaBase#PushData() always distributes the entire context_.mem_limit() memory to the push and pre ops. If there are unconsumed data structures leftover from previous operations this will cause the allocation of more memory than provisioned.

#include <thrill/api/reduce_by_key.hpp>
#include <thrill/api/size.hpp>
#include <thrill/api/generate.hpp>

int main(int, char const *[]) {
  return thrill::Run([&](thrill::Context& context) {
    context.enable_consume();

    auto stuff = thrill::api::Generate(context, 50, [](size_t index) { return index; })
      .ReduceByKey(
        [](const size_t i) { return i % 5; },
        [](const size_t a, const size_t b) { return a + b; })
      // PushData on this ReduceByKey will allocate the entire available memory for the hash table
      .Keep();

    size_t size = stuff.Size(); // this will not consume, so the hash table is still in memory.

    size_t other_stuff_size = thrill::api::Generate(context, 60, [](size_t index) { return index; })
      .ReduceByKey(
        [](const size_t i) { return i % 6; },
        [](const size_t a, const size_t b) { return a + b; })
      // PushData here will assueme it has the entire memory available, but it doesn't, since it's still occupied by the first hash table
      .Size();

    // finally consume the first hash table
    return stuff.Filter([](const size_t i) { return i % 2 == 0; }).Size() * other_stuff_size * size;
  });
}

This is the output from thrill. I extended it at a few points so it includes information on when nodes are disposed and also on how much memory is assigned to each node during push data

Thrill: using 8.000 GiB RAM total, BlockPool=2.667 GiB, workers=1.333 GiB, floating=2.667 GiB.
Thrill: running locally with 2 test hosts and 2 workers per host in a local tcp network.
Thrill: using 8.000 GiB RAM total, BlockPool=2.667 GiB, workers=1.333 GiB, floating=2.667 GiB.
Thrill: no THRILL_LOG was found, so no json log is written.
Thrill: no config file ~/.thrill found, using default disk configuration.
Thrill: disk '/tmp/thrill.tmp' is allocated, space: 1000 MiB, I/O implementation: syscall queue=0 devid=0 unlink_on_open
[host 0 worker 0 000000] Execute()  stage Generate.1
[host 0 worker 0 000001] PushData() stage Generate.1 with targets [ReduceByKey.2]
[host 0 worker 0 000002] Mem limit 715827882
[host 0 worker 0 000003] Source node gets 0
[host 0 worker 0 000004] StageBuilder: distribute remaining worker memory  715827882  to  1  DIANodes
[host 0 worker 0 000005] Dispose()  stage Generate.1
[host 0 worker 0 000006] Execute()  stage ReduceByKey.2
[host 0 worker 0 000007] PushData() stage ReduceByKey.2 with targets [Size.3]
[host 0 worker 0 000008] Mem limit 715827882
[host 0 worker 0 000009] Target node gets 0
[host 0 worker 0 000010] StageBuilder: distribute remaining worker memory  715827882  to  1  DIANodes
[host 0 worker 0 000011] Execute()  stage Size.3
[host 0 worker 0 000012] Execute()  stage Generate.4
[host 0 worker 0 000013] PushData() stage Generate.4 with targets [ReduceByKey.5]
[host 0 worker 0 000014] Mem limit 715827882
[host 0 worker 0 000015] Source node gets 0
[host 0 worker 0 000016] StageBuilder: distribute remaining worker memory  715827882  to  1  DIANodes
[host 0 worker 0 000017] Dispose()  stage Generate.4
[host 0 worker 0 000018] Execute()  stage ReduceByKey.5
[host 0 worker 0 000019] PushData() stage ReduceByKey.5 with targets [Size.6]
[host 0 worker 0 000020] Mem limit 715827882
[host 0 worker 0 000021] Target node gets 0
[host 0 worker 0 000022] StageBuilder: distribute remaining worker memory  715827882  to  1  DIANodes
[host 0 worker 0 000023] Dispose()  stage ReduceByKey.5
[host 0 worker 0 000024] Execute()  stage Size.6
[host 0 worker 0 000025] PushData() stage ReduceByKey.2 with targets [Size.8]
[host 0 worker 0 000026] Mem limit 715827882
[host 0 worker 0 000027] Target node gets 0
[host 0 worker 0 000028] StageBuilder: distribute remaining worker memory  715827882  to  1  DIANodes
[host 0 worker 0 000029] Dispose()  stage ReduceByKey.2
[host 0 worker 0 000030] Execute()  stage Size.8
Thrill: ran 0.015415s with max 32.004 MiB in DIA Blocks, 1.125 KiB network traffic, 0.000 B disk I/O, and 0.000 B max disk use.
malloc_tracker ### exiting, total: 64379064, peak: 34763000, current: 0 / 432, allocs: 793, unfreed: 6

As long as the memory used during these operations is only alocated virtually this is not a big problem, but once it's actually used this is quite problematic. It can be circumvented by caching the data before the keep, because that will dispose the data structures. Another option is to avoid nonlinear data flows alltogether if possible, which in my experience helps a lot with stability.

BW HPC Infiband IP mapper broken

The awk script to map hostnames to the IPs of the infiniband interfaces is broken. Due to the extension bwuncluster now has hostnames with ids greater than the range captured in the awk script. Inserting another if else branch in awk did not work because awk 😱 It works until 999 but doing id < 1014 is only true if the input number has also four digits... WHAT THE HELL awk.

readdir_r is deprecated

readdir_r is deprecated as of version 2.24 of the GNU C library. The manpage states:

   It is recommended that applications use readdir() instead of readdir_r().  Furthermore, since version  2.24,
   glibc deprecates readdir_r().  The reasons are as follows:

   *  On  systems where NAME_MAX is undefined, calling readdir_r() may be unsafe because the interface does not
      allow the caller to specify the length of the buffer used for the returned directory entry.

   *  On some systems, readdir_r() can't read directory entries with very long names.  When the glibc implemen‐
      tation  encounters  such  a name, readdir_r() fails with the error ENAMETOOLONG after the final directory
      entry has been read.  On some other systems, readdir_r() may return a success status,  but  the  returned
      d_name field may not be null terminated or may be truncated.

   *  In  the current POSIX.1 specification (POSIX.1-2008), readdir(3) is not required to be thread-safe.  How‐
      ever, in modern implementations (including the glibc implementation), concurrent calls to readdir(3) that
      specify  different  directory  streams  are  thread-safe.  Therefore, the use of readdir_r() is generally
      unnecessary in multithreaded programs.  In cases where multiple threads must read from the same directory
      stream, using readdir(3) with external synchronization is still preferable to the use of readdir_r(), for
      the reasons given in the points above.

   *  It is expected that a future version of POSIX.1 will make readdir_r() obsolete, and  require  that  read‐
      dir() be thread-safe when concurrently employed on different directory streams.

See also https://womble.decadent.org.uk/readdir_r-advisory.html which also contains suggestions on how to replace it.

Porting to AWS Lambda/SQS/S3

Now that SQS is being added as a trigger for AWS Lambda I would like to port Thrill so it could be used in a "serverless" fashion without EC2. You can abuse AWS CodeBuild when you need an on-demand instance with larger RAM.

Any suggestions on where to start? First order of business is getting everything compiled into a static zip file.

libS3 link line fails when building libthrill.so

: && /usr/bin/c++ -fPIC -fdiagnostics-color=always -std=c++1y -g -W -Wall
-Wextra -fPIC -Wpedantic -Wcast-qual -Winit-self -Wnoexcept -Woverloaded-virtual
-Wredundant-decls -Ofast -DNDEBUG -march=native -shared -Wl,-soname,libthrill.so
-o thrill/libthrill.so thrill/CMakeFiles/thrill.dir/api/context.cpp.o
thrill/CMakeFiles/thrill.dir/api/dia_base.cpp.o
thrill/CMakeFiles/thrill.dir/common/cmdline_parser.cpp.o
thrill/CMakeFiles/thrill.dir/common/hash.cpp.o
thrill/CMakeFiles/thrill.dir/common/json_logger.cpp.o
thrill/CMakeFiles/thrill.dir/common/linux_proc_stats.cpp.o
thrill/CMakeFiles/thrill.dir/common/logger.cpp.o
thrill/CMakeFiles/thrill.dir/common/porting.cpp.o
thrill/CMakeFiles/thrill.dir/common/profile_thread.cpp.o
thrill/CMakeFiles/thrill.dir/common/string.cpp.o
thrill/CMakeFiles/thrill.dir/common/thread_pool.cpp.o
thrill/CMakeFiles/thrill.dir/data/block.cpp.o
thrill/CMakeFiles/thrill.dir/data/block_pool.cpp.o
thrill/CMakeFiles/thrill.dir/data/block_queue.cpp.o
thrill/CMakeFiles/thrill.dir/data/byte_block.cpp.o
thrill/CMakeFiles/thrill.dir/data/cat_stream.cpp.o
thrill/CMakeFiles/thrill.dir/data/file.cpp.o
thrill/CMakeFiles/thrill.dir/data/mix_block_queue.cpp.o
thrill/CMakeFiles/thrill.dir/data/mix_stream.cpp.o
thrill/CMakeFiles/thrill.dir/data/multiplexer.cpp.o
thrill/CMakeFiles/thrill.dir/data/stream.cpp.o
thrill/CMakeFiles/thrill.dir/data/stream_sink.cpp.o
thrill/CMakeFiles/thrill.dir/io/bid.cpp.o
thrill/CMakeFiles/thrill.dir/io/block_manager.cpp.o
thrill/CMakeFiles/thrill.dir/io/config_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/create_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/disk_allocator.cpp.o
thrill/CMakeFiles/thrill.dir/io/disk_queued_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/disk_queues.cpp.o
thrill/CMakeFiles/thrill.dir/io/file_base.cpp.o
thrill/CMakeFiles/thrill.dir/io/iostats.cpp.o
thrill/CMakeFiles/thrill.dir/io/linuxaio_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/linuxaio_queue.cpp.o
thrill/CMakeFiles/thrill.dir/io/linuxaio_request.cpp.o
thrill/CMakeFiles/thrill.dir/io/memory_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/mmap_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/request.cpp.o
thrill/CMakeFiles/thrill.dir/io/request_queue_impl_1q.cpp.o
thrill/CMakeFiles/thrill.dir/io/request_queue_impl_qw_qr.cpp.o
thrill/CMakeFiles/thrill.dir/io/request_queue_impl_worker.cpp.o
thrill/CMakeFiles/thrill.dir/io/serving_request.cpp.o
thrill/CMakeFiles/thrill.dir/io/syscall_file.cpp.o
thrill/CMakeFiles/thrill.dir/io/ufs_file_base.cpp.o
thrill/CMakeFiles/thrill.dir/io/wfs_file_base.cpp.o
thrill/CMakeFiles/thrill.dir/io/wincall_file.cpp.o
thrill/CMakeFiles/thrill.dir/mem/malloc_tracker.cpp.o
thrill/CMakeFiles/thrill.dir/mem/manager.cpp.o
thrill/CMakeFiles/thrill.dir/mem/pool.cpp.o
thrill/CMakeFiles/thrill.dir/net/dispatcher_thread.cpp.o
thrill/CMakeFiles/thrill.dir/net/flow_control_channel.cpp.o
thrill/CMakeFiles/thrill.dir/net/group.cpp.o
thrill/CMakeFiles/thrill.dir/net/mock/group.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/bzip2_filter.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/file_io.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/gzip_filter.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/s3_file.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/sys_file.cpp.o
thrill/CMakeFiles/thrill.dir/vfs/temporary_directory.cpp.o
thrill/CMakeFiles/thrill.dir/net/tcp/construct.cpp.o
thrill/CMakeFiles/thrill.dir/net/tcp/group.cpp.o
thrill/CMakeFiles/thrill.dir/net/tcp/select_dispatcher.cpp.o
thrill/CMakeFiles/thrill.dir/net/tcp/socket.cpp.o
thrill/CMakeFiles/thrill.dir/net/tcp/socket_address.cpp.o -ltbb -ltbbmalloc
-ljemalloc -lpthread -ldl -lz -lbz2 extlib/libs3/libs3.a -lxml2 -lcurl -lssl
-lcrypto && :
/usr/bin/ld: extlib/libs3/libs3.a(multipart.c.o): relocation R_X86_64_PC32
against symbol `stderr@@GLIBC_2.2.5' can not be used when making a shared
object; recompile with -fPIC
/usr/bin/ld: final link failed: Bad value

Change from File to FilePtr

File was once thought to be copyable. This seems unnecessary now, and with shared_ptr File would be easier to manage.

GroupToIndex returns DIAs with too few elements

In the case where one of the worker partitions is empty GroupToIndex will emit 0 elements rather than the number required by the index range. This results in a DIA with wrong size in the next operation.

Goals for Paper

For a real paper we need:

  • concise explanation what Thrill is
  • benchmarks on
    • WordCount
    • PageRank
    • k-Means
    • DC3
      compare it to others: Spark, Flink, MPI implementations where these are possible

Statistics for StageBuilder and data/net Subsystem

We need much more statistical output and aggregation.
a) how does StageBuilder run the graph?, which node takes how much time?, which subnodes are run in this stage?
b) each Node's resources must be monitorable: how many Files does the Node use? how many items/bytes were put into it? how many Streams did it use? What was the transfer speed of the streams?
c) we need individual and overall aggregated stats to find straglers
d) we need a time line of how many data blocks exist in the system, maybe even which files they belong to. -> nice plots over time

Index Range Partitioning Quirks

Recently I have been investigating some strange issues where I got ToIndex operations returning DIAs with wrong size and and also cases where the program got stuck in some kind of endless loop. After some debugging I found a GroupToIndex operation where a worker received an element with a key one below the actual start of its range. This should of course never happen and can cause all sorts of undesired behaviour. In this case rather than crashing the GroupToIndex operation went into an endless loop because GroupToIndex emits neutral elements and increments the current key until it is equal to the next key received - which will never happen because its actually less than the current key - at least not until it overflows... So thats easy to fix with some more restrictive asserts, but its just a symptom.

Why did the element get sent to the wrong worker in the first place? GroupToIndex uses common::CalculateLocalRange to assign the range of each worker which in turn uses some floating point based caluclation to distribute the index range reeeeealy fairly among the workers. But when the elements are transmitted a simple integer division is performed. And with a slight rounding error of the floating point calculation the whole thing can co terribly wrong. it gets even worse when somebody is using -ffast-math. Another problem is that the index calculation is not done consistently. ReduceToIndex for example uses a different code path. So I suggest to clean this mess up, refactor the code to use one consistent index range calculation and also get rid of floating point arithmetics.

AWS S3 access

I'm currently struggeling with several issues when trying to access files on AWS s3

  • as it seems thrill does not allow to set the aws region - I believe this might be circumvented if an explicit host is given
  • the conditions in s3_file.cpp:127 are flipped so the program dies when credentials are given, and not when not...
  • when reading a file the program dies with because somehow the strings containing the bucket name get totally messed up.
[unknown 139753817892736] S3-ERROR - Status: InvalidBucketNameFirstCharacter
DIE: S3-ERROR during read: InvalidBucketNameFirstCharacter @ [...]/thrill/thrill/vfs/s3_file.cpp:343

definition of implicit copy assignment operator is deprecated because it has a user-declared copy constructor

In file included from ../thrill/net/tcp/select_dispatcher.cpp:13:
In file included from ../thrill/net/tcp/select_dispatcher.hpp:19:
In file included from ../thrill/common/logger.hpp:17:
../thrill/mem/allocator.hpp:56:5: warning: definition of implicit copy assignment operator for 'Allocator<thrill::common::delegate<bool (), std::allocator<void> > >' is deprecated because it has a user-declared copy constructor [-Wdeprecated]
    Allocator(const Allocator&) noexcept = default;

Sort crashes when sorting very few items

Crashes when sorting 2 items with 4 PEs. (2 worker per host).

==10058==    by 0xB84751: std::vector<StringFragmentMod2<char>, std::allocator<StringFragmentMod2<char> > >::push_back(StringFragmentMod2<char> const&) (stl_vector.h:917)
==10058==    by 0xBD06B1: _ZN6thrill3api8SortNodeI18StringFragmentMod2[...]FindAndSendSplittersERSt6vectorIS3_SaIS3_EEm (sort.hpp:143)

Suggest support windows

Hi,i hope thrill can surpport windows. The benefits are not just surpport one more paltform but alse can improve the develop efficiency with msvc.
I think you can replace the current network library with a cross platform network libary, and then thrill will surpport windows.
I have devleloped an easy to use and high performance rpc libary with modern c++, includes request/response and pub/sub mode.
Here is the introduction: https://github.com/topcpporg/rest_rpc/wiki/English
If you want use rpc, I can do some help to do this work.

maybe you could consider surpporting cross paltform

I think this project could be more universal, surpport win and linux.
the network can be implement by boost.asio which surpport cross platform.

It would be very regrettable if modern c++ has no Big Data batch computations framwork, howerver thrill
appeared, the future is bright.
i'm very intresting in this project, i want to join in.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.