Git Product home page Git Product logo

scotty-window-processor's Introduction

Scotty: Efficient Window Aggregation for out-of-order Stream Processing Build Status

This repository provides Scotty, a framework for efficient window aggregations for out-of-order Stream Processing.

Features:

  • High performance window aggregation with stream slicing.
  • Scales to thousands of concurrent windows.
  • Support for Tumbling, Sliding, and Session Windows.
  • Initial support for Count-based windows.
  • Out-of-order processing.
  • Aggregate Sharing among all concurrent windows.
  • Connector for Apache Flink.
  • Connector for Apache Storm.
  • Connector for Apache Beam.
  • Connector for Apache Kafka.
  • Connector for Apache Spark.
  • Connector for Apache Samza.

Website

  • Our Website provides several blog posts about Scotty and its features.

Resources:

Flink Integration Example:

// Instantiate Scotty window operator
KeyedScottyWindowOperator<Tuple, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> windowOperator =
new KeyedScottyWindowOperator<>(new SumWindowFunction());

// Add multiple windows to the same operator
windowOperator.addWindow(new TumblingWindow(WindowMeasure.Time, 1000));
windowOperator.addWindow(new SlidingWindow(WindowMeasure.Time, 1000, 5000));
windowOperator.addWindow(new SessionWindow(WindowMeasure.Time, 1000));

// Add operator to Flink job
stream.keyBy(0)
      .process(windowOperator)

More examples for Flink as well as demos for the other systems are provided in the demo folder.

Benchmark:

Throughput in comparison to the Flink standard window operator (Window Buckets) for Sliding Event-Time Windows:
We fix the window size to 60 seconds and modify the slide size. If the slide size gets smaller, Flink has to maintain a higher number of overlapping (concurrent) windows.

Throughput in comparison to Flink for concurrent Tumbling Windows:

Out-of-order Processing in Scotty:

A watermark with timestamp t indicates that no tuple with a timestamp lower than t will arrive. When a watermark arrives, Scotty outputs all window results of windows that ended before t.

However, some out-of-order tuples may arrive with a timestamp t' lower than the watermark t (t' <= t). For that case, the maxLateness indicates how long Scotty stores slices and their corresponding window aggregates. Scotty processes an out-of-order tuple as long as its in the allowed lateness, i.e. it has an timestamp t' that is bigger than watermark minus the maxLateness (t' > t - maxLateness). Then, Scotty outputs updated aggregates for the windows. The maxLateness can be adjusted with the method setMaxLateness of the slicingWindowOperator.

If an out-of-order tuple arrives outside the allowed lateness (with a timestamp t' < t - maxLateness - maxFixedWindowSize), it is discarded. MaxFixedWindowSize is the maximum size of window types which have fixed sizes (e.g. tumbling window or sliding window). For context-aware window types, the maxFixedWindowSize is 0.

If no watermark has arrived yet, an out-of-order tuple is outside the allowed lateness, when its timestamp is lower than the start timestamp of the first slice (t' < tsOfFirstSlice), or in case of context aware windows, the timestamp of the first tuple minus the maxlateness (t' < tsOfFirstTuple - maxLateness).

Roadmap:

We plan to extend our framework with the following features:

  • Support for User-Defined windows
  • User-defined window measures
  • Support for Refinements
  • Support of Flink Checkpoints and State Backends

Setup:

The maven package is currently not publically available. Therefore we have to build it from source:

git clone [email protected]:TU-Berlin-DIMA/scotty-window-processor.git

mvn clean install

Then you can use the library in your maven project.

<dependency> 
 <groupId>stream.scotty</groupId>
 <artifactId>flink-connector</artifactId>
 <version>0.4</version>
</dependency>

Efficient Window Aggregation with General Stream Slicing at EDBT 2019

General Stream Slicing received the Best Paper Award at the 22nd International Conference on Extending Database Technology in March 2019.

Abstract:
Window aggregation is a core operation in data stream processing. Existing aggregation techniques focus on reducing latency, eliminating redundant computations, and minimizing memory usage. However, each technique operates under different assumptions with respect to workload characteristics such as properties of aggregation functions (e.g., invertible, associative), window types (e.g., sliding, sessions), windowing measures (e.g., time- or countbased), and stream (dis)order. Violating the assumptions of a technique can deem it unusable or drastically reduce its performance. In this paper, we present the first general stream slicing technique for window aggregation. General stream slicing automatically adapts to workload characteristics to improve performance without sacrificing its general applicability. As a prerequisite, we identify workload characteristics which affect the performance and applicability of aggregation techniques. Our experiments show that general stream slicing outperforms alternative concepts by up to one order of magnitude.

@inproceedings{traub2019efficient,
  title={Efficient Window Aggregation with General Stream Slicing},
  author={Traub, Jonas and Grulich, Philipp M. and Cu{\'e}llar, Alejandro Rodr{\'\i}guez and Bre{\ss}, Sebastian and Katsifodimos, Asterios and Rabl, Tilmann and Markl, Volker},
  booktitle={22th International Conference on Extending Database Technology (EDBT)},
  year={2019}
}

Acknowledgements: This work was funded by the EU project E2Data (780245), Delft Data Science, and the German Ministry for Education and Research as BBDC I (01IS14013A) and BBDC II (01IS18025A)

Scotty: Efficient Window Aggregation for out-of-order Stream Processing at ICDE 2018

Scotty was first published at the 34th IEEE International Conference on Data Engineering in April 2018.

Abstract:
Computing aggregates over windows is at the core of virtually every stream processing job. Typical stream processing applications involve overlapping windows and, therefore, cause redundant computations. Several techniques prevent this redundancy by sharing partial aggregates among windows. However, these techniques do not support out-of-order processing and session windows. Out-of-order processing is a key requirement to deal with delayed tuples in case of source failures such as temporary sensor outages. Session windows are widely used to separate different periods of user activity from each other. In this paper, we present Scotty, a high throughput operator for window discretization and aggregation. Scotty splits streams into non-overlapping slices and computes partial aggregates per slice. These partial aggregates are shared among all concurrent queries with arbitrary combinations of tumbling, sliding, and session windows. Scotty introduces the first slicing technique which (1) enables stream slicing for session windows in addition to tumbling and sliding windows and (2) processes out-of-order tuples efficiently. Our technique is generally applicable to a broad group of dataflow systems which use a unified batch and stream processing model. Our experiments show that we achieve a throughput an order of magnitude higher than alternative stateof-the-art solutions.

@inproceedings{traub2018scotty,
  title={Scotty: Efficient Window Aggregation for out-of-order Stream Processing},
  author={Traub, Jonas and Grulich, Philipp M. and Cuellar, Alejandro Rodríguez and Breß, Sebastian and Katsifodimos, Asterios and Rabl, Tilmann and Markl, Volker},
  booktitle={34th IEEE International Conference on Data Engineering (ICDE)},
  year={2018}
}

Acknowledgements: This work was supported by the EU projects Proteus (687691) and Streamline (688191), DFG Stratosphere (606902), and the German Ministry for Education and Research as BBDC (01IS14013A) and Software Campus (01IS12056).

Scotty: General and Efficient Open-source Window Aggregation for Stream Processing Systems at ACM TODS 2021

Abstract:
Window aggregation is a core operation in data stream processing. Existing aggregation techniques focus on reducing latency, eliminating redundant computations, or minimizing memory usage. However, each technique operates under different assumptions with respect to workload characteristics, such as properties of aggregation functions (e.g., invertible, associative), window types (e.g., sliding, sessions), windowing measures (e.g., time- or count-based), and stream (dis)order. In this article, we present Scotty, an efficient and general open-source operator for sliding-window aggregation in stream processing systems, such as Apache Flink, Apache Beam, Apache Samza, Apache Kafka, Apache Spark, and Apache Storm. One can easily extend Scotty with user-defined aggregation functions and window types. Scotty implements the concept of general stream slicing and derives workload characteristics from aggregation queries to improve performance without sacrificing its general applicability. We provide an in-depth view on the algorithms of the general stream slicing approach. Our experiments show that Scotty outperforms alternative solutions

@article{traub2021scotty,
  title={Scotty: General and Efficient Open-source Window Aggregation for Stream Processing Systems},
  author={Traub, Jonas and Grulich, Philipp Marian and Cu{\'e}llar, Alejandro Rodr{\'\i}guez and Bre{\ss}, Sebastian and Katsifodimos, Asterios and Rabl, Tilmann and Markl, Volker},
  journal={ACM Transactions on Database Systems (TODS)},
  volume={46},
  year={2021},
  publisher={ACM New York, NY, USA}
}

Acknowledgements: This work was supported by the German Ministry for Education and Research as BIFOLD (01IS18025A and 01IS18037A), SFB 1404 FONDA, and the EU Horizon 2020 Opertus Mundi project (870228).

Benson et al.: Disco: Efficient Distributed Window Aggregation

Abstract:
Many business applications benefit from fast analysis of online data streams. Modern stream processing engines (SPEs) provide complex window types and user-defined aggregation functions to analyze streams. While SPEs run in central data centers, wireless sensors networks (WSNs) perform distributed aggregations close to the data sources, which is beneficial especially in modern IoT setups. However, WSNs support only basic aggregations and windows. To bridge the gap between complex central aggregations and simple distributed analysis, we propose Disco, a distributed complex window aggregation approach. Disco processes complex window types on multiple independent nodes while efficiently aggregating incoming data streams. Our evaluation shows that Disco’s throughput scales linearly with the number of nodes and that Disco already outperforms a centralized solution in a two-node setup. Furthermore, Disco reduces the network cost significantly compared to the centralized approach. Disco’s treelike topology handles thousands of nodes per level and scales to support future data-intensive streaming applications.

@inproceedings{benson2020disco,
  title={Disco: Efficient Distributed Window Aggregation.},
  author={Benson, Lawrence and Grulich, Philipp M and Zeuch, Steffen and Markl, Volker and Rabl, Tilmann},
  booktitle={EDBT},
  volume={20},
  year={2020}
}

Verwiebe et al.: Survey of window types for aggregation in stream processing systems

Abstract:
In this paper, we present the first comprehensive survey of window types for stream processing systems which have been presented in research and commercial systems. We cover publications from the most relevant conferences, journals, and system whitepapers on stream processing, windowing, and window aggregation which have been published over the last 20 years. For each window type, we provide detailed specifications, formal notations, synonyms, and use-case examples. We classify each window type according to categories that have been proposed in literature and describe the out-of-order processing. In addition, we examine academic, commercial, and open-source systems with respect to the window types that they support. Our survey offers a comprehensive overview that may serve as a guideline for the development of stream processing systems, window aggregation techniques, and frameworks that support a variety of window types.

@article{verwiebe2023survey,
  title={Survey of window types for aggregation in stream processing systems},
  author={Verwiebe, Juliane and Grulich, Philipp M and Traub, Jonas and Markl, Volker},
  journal={The VLDB Journal},
  pages={1--27},
  year={2023},
  publisher={Springer}
}

Acknowledgements: This work was funded by German Research Foundation (MA4662-5 and 410830482), German Federal Ministry for Education and Research as BIFOLD - Berlin Institute for the Foundations of Learning and Data (ref. 01IS18025A and ref. 01IS18037A) and Software Campus (01IS17052).

Verwiebe et al.: Algorithms for Windowed Aggregations and Joins on Distributed Stream Processing Systems

Abstract:
Window aggregations and windowed joins are central operators of modern real-time analytic workloads and significantly impact the performance of stream processing systems. This paper gives an overview of state-of-the-art research in this area conducted by the Berlin Institute for the Foundations of Learning and Data (BIFOLD) and the Technische Universität Berlin. To this end, we present different algorithms for efficiently processing windowed operators and discuss techniques for distributed stream processing. Recently, several approaches have leveraged modern hardware for windowed stream processing, which we will also include in this overview. Additionally, we describe the integration of windowed operators into various stream processing systems and diverse applications that use specialized window operations

@article{verwiebe2022algorithms,
  title={Algorithms for Windowed Aggregations and Joins on Distributed Stream Processing Systems},
  author={Verwiebe, Juliane and Grulich, Philipp M and Traub, Jonas and Markl, Volker},
  journal={Datenbank-Spektrum},
  volume={22},
  number={2},
  pages={99--107},
  year={2022},
  publisher={Springer}
}

scotty-window-processor's People

Contributors

andreejs avatar bvonheid avatar dependabot[bot] avatar julianev avatar lawben avatar philippgrulich avatar powibol avatar tuterbatuhan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scotty-window-processor's Issues

[Bug] Incorrect Slices of Sliding Window depending on Slide Value

Scotty's StreamSlicer starts a new slice based on the window edge that is returned by the function assignNextWindowStart.

public long assignNextWindowStart(long recordStamp) {
return recordStamp + getSlide() - (recordStamp) % getSlide();
}

This does not work for sliding windows whose slide value is not a multiple of the window size, for instance, a sliding window with size = 10 and slide = 3.
In these cases, the values of the window starts and the window ends are not identical.
The function only returns the next window start which is based on the slide value, but does not return the next window end.
Thus, the slices do not end when a window ends, leading to incorrect window results.

Possible bug or simple code duplication

Not sure if this is a bug because I haven't come up with an actual example to test this, but in the StreamSlicer in determineSlice() there seems to be a small mistake in the code.

if (min_next_edge_ts == te) {
if (flex_count > 0) {
sliceManager.appendSlice(te, new Slice.Fixed());
} else {
sliceManager.appendSlice(min_next_edge_ts, new Slice.Fixed());
}

As min_next_edge_ts == te, lines 74 and 76 are identical. I believe line 74 should add new Slice.Flexible(flex_count), as the flex_count ist > 0. If this is intended, the inner if-check can be removed.

Questions regarding to window states and out-of-order late tuples

Hi,
I have been working on Scotty for a while and it is improving my topologies extremely.
Yet, I have confronted some issues while I'm processing out-of-order (o-o-o) late tuples.
I will be happy if you can guide about the following two points.

  1. When an o-o-o tuple arrives after which belongs to an ended window, I cannot see the new output of the updated window state. Is there a mechanism that handles these late tuples in Scotty?

  2. I couldn't find how Scotty handles the states of the outputted windows. Is there a mechanism to remove the sates once they are no longer needed?

Thank you in advance.

[Bug] Stream Slicer stuck in Loop

For a sliding window with size = 10 and slide = 4, the StreamSlicer does not get out of the while loop when processing the first tuple.

while (windowManager.hasFixedWindows() && te > min_next_edge_ts) {
if (min_next_edge_ts >= 0)
sliceManager.appendSlice(min_next_edge_ts, new Slice.Fixed());
min_next_edge_ts = calculateNextFixedEdge(te);
}

Reason: For its first call, assignNextWindowStart receives Long.MAX_VALUE as t_c and returns Long.MIN_VALUE because of the slide value 4. The variable min_next_edge_ts remains Long.MIN_VALUE and is again set to Long.MAX_VALUE in the next iteration of the loop.

private long calculateNextFixedEdge(long te) {
// next_edge will be the last edge
long current_min_edge = min_next_edge_ts == Long.MIN_VALUE ? Long.MAX_VALUE : min_next_edge_ts;
long t_c = Math.max(te - this.windowManager.getMaxLateness(), current_min_edge);
long edge = Long.MAX_VALUE;
for (ContextFreeWindow tw : this.windowManager.getContextFreeWindows()) {
if (tw.getWindowMeasure() == WindowMeasure.Time) {
//long newNextEdge = t_c + tw.getSize() - (t_c) % tw.getSize();
long newNextEdge = tw.assignNextWindowStart(t_c);
edge = Math.min(newNextEdge, edge);
}
}
return edge;
}

Adapt Slice Factory

The current implementation of the Slice Factory does not reflect the decision tree for storing individual tuples in the General Stream Slicing paper.

public Slice<InputType, ValueType> createSlice(long startTs, long maxValue, long startCount, long endCount, Slice.Type type) {
if(!windowManager.hasCountMeasure()){
return new EagerSlice<>(stateFactory, windowManager, startTs, maxValue, startCount, endCount, type);
}
return new LazySlice<>(stateFactory, windowManager, startTs, maxValue, startCount, endCount, type);
}

Scotty Website

We should add a proper website for scotty, with a a helpful documentation and examples.

[Bug] Out-of-order record handling

Environment:

SlidingWindow with following configuration: SlidingWindow(WindowMeasure.Time, 5000, 1000)

Exception

When handling out-of-order records that has a timestamp smaller than currentWatermark - maxLateness - maxFinalWindowSize the following exception is thrown.

java.lang.IndexOutOfBoundsException: Index -1 out of bounds for length 6
        at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
        at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
        at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
        at java.base/java.util.Objects.checkIndex(Objects.java:372)
        at java.base/java.util.ArrayList.get(ArrayList.java:458)
        at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
        at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
        at de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
        at de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
        at de.tub.dima.scotty.kafkastreamsconnector.KeyedScottyWindowOperator.process(KeyedScottyWindowOperator.java:47)
        at de.tub.dima.scotty.kafkastreamsconnector.KeyedScottyWindowTransformer.transform(KeyedScottyWindowTransformer.java:26)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
        at org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
        at org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:64)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
        at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

In the SliceManager class:

int indexOfSlice = this.aggregationStore.findSliceIndexByTimestamp(ts);
this.aggregationStore.insertValueToSlice(indexOfSlice, element, ts);

a -1 is assigned to indexOfSlice and therefore in the insertValueToSlice method the exception is triggered.

Actually no value needs to be added to a slice, because the element is out of order (currentWatermark - maxLateness - maxFinalWindowSize) and therefore not should be added.

scotty_excpetion

Readme and Setup enhancement

Hi,
while setting up Scotty I encountered a couple of obstacles:

  • The Readme suggests to use a pom with the version 0.3 which is not working since Scotty got updated to version 0.4
  • Trying to do mvn clean install in IntelliJ would cause an error (maven version must be lower than 3.3.) if the maven version is greater than 3.3. But doing mvn clean install trough the command line would succeed, even if the maven version is not lower than 3.3. Thus Changing the version on line 145 according to my maven version solved that for me.
    <requireMavenVersion>
    <!-- maven version must be lower than 3.3. See FLINK-3158 -->
    <version>(,3.3)</version>
    </requireMavenVersion>
  • The Flink Integration Example was a little confusing for me as a beginner since it only shows how someone with experience would adapt their code to use Scotty in their project. The FlinkSumDemo.java provided more information how to get started with Scotty while using Flink. I would suggest to add a link in the Readme which points towards the demo folder or maybe a more in depth example.

Watermark processing fails if no more slices are present

WindowManager.processWatermark assumes that there is always at least one slice present, which does not have to be the case. I would suggest checking if there are any slices present and if not, just return.

I'll add a PR for this in a moment.

Improve tests

Currently, unit tests depend on the result order.
This is very sensitive to intern library changes.
We should improve our tests and make them order independent.

mvn install gives error on tests

Hi,

I found this repo through your excellent papers (Scotty, Cutty). I am trying to build the Flink adaptor and use it with the latest Flink distribution (1.14+).

However I met error with mvn install, I've attached the log here:

Results :

Tests in error: 
  twoWindowsTest(de.tub.dima.scotty.slicing.aggregationstore.test.windowTest.PunctuationWindowTupleTest): class java.lang.String cannot be cast to class java.lang.Integer (java.lang.String and java.lang.Integer are in module java.base of loader 'bootstrap')

Tests run: 64, Failures: 0, Errors: 1, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Scotty Window Processor 0.4:
[INFO] 
[INFO] Scotty Window Processor ............................ SUCCESS [  0.144 s]
[INFO] core ............................................... SUCCESS [  0.752 s]
[INFO] state .............................................. SUCCESS [  0.068 s]
[INFO] slicing ............................................ FAILURE [  0.711 s]
[INFO] flink-connector .................................... SKIPPED
[INFO] benchmark .......................................... SKIPPED
[INFO] samza-connector .................................... SKIPPED
[INFO] spark-connector .................................... SKIPPED
[INFO] kafkaStreams-connector ............................. SKIPPED
[INFO] beam-connector ..................................... SKIPPED
[INFO] storm-connector .................................... SKIPPED
[INFO] demo ............................................... SKIPPED
[INFO] flink-demo ......................................... SKIPPED
[INFO] spark-demo ......................................... SKIPPED
[INFO] storm-demo ......................................... SKIPPED
[INFO] beam-demo .......................................... SKIPPED
[INFO] kafka-demo ......................................... SKIPPED
[INFO] samza-demo ......................................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.751 s
[INFO] Finished at: 2022-06-27T10:48:58-07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12.4:test (default-test) on project slicing: There are test failures.
[ERROR] 
[ERROR] Please refer to /Users/yicong-huang/IdeaProjects/SWAT-RT/scotty-window-processor/slicing/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <args> -rf :slicing

Could you please help me identify what's going wrong? Also, would it be available to build with sbt in the near future (or simply publish on maven central should help)?

Thanks in advance!

No data for singleton aggregations?

What is the expected behavior when only a single input occurs within a window? We're trying to run a simple sum aggregation over some events with long-tail of infrequent events (and a smaller number of high frequency events). We boiled it down to a test, to run in a test harness (the aggregation function simply sums the count, and takes the max of timestamps - omitted for simplicity, because the function itself doesn't appear to matter for this).

      ScottyWindowOperator<String, UserEventCounter, UserEventCounter> keyedScottyWindowOperator = ...
        UserEventCounter element1 = new UserEventCounter();
        element1.userKey = "user1";
        element1.timestamp = 1;
        element1.count = 4;

        UserEventCounter element2 = new UserEventCounter();
        element2.userKey = "user1";
        element2.timestamp = 16;
        element2.count = 5;

        UserEventCounter element3 = new UserEventCounter();
        element3.userKey = "user1";
        element3.timestamp = 33;
        element3.count = 6;

        testHarness.processElement(element1, 1);
        testHarness.processWatermark(11);
        testHarness.processElement(element2, 16);
        testHarness.processWatermark(25);        
        testHarness.processWatermark(30);        
        testHarness.processElement(element3, 33);
        testHarness.processWatermark(35);
        testHarness.processWatermark(40);
        testHarness.processWatermark(45);
        testHarness.processWatermark(50);
        

We modified ScottyWindowOperator to add some debugging prints (value in processElement, and the AggregateWindow, before filtering on aggregation.hasValue() in processWatermarks, and see the following sequence:

processElement|K     user1 W@             1| = UserEventCounter(userKey=user1, timestamp=1, count=4)
processElement|K     user1 W@            16| = UserEventCounter(userKey=user1, timestamp=16, count=5)

processWatermark|user1 @           11| = WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])

processElement|K     user1 W@            33| = UserEventCounter(userKey=user1, timestamp=33, count=6)

processWatermark|user1 @           25| = WindowResult(Time,15-25,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,10-20,[ScottySumUserEventCounter->])
processWatermark|user1 @           25| = WindowResult(Time,5-15,[ScottySumUserEventCounter->])

Observations:

  • The only element that's actually emitted is WindowResult(Time,0-10,[ScottySumUserEventCounter->UserEventCounter(userKey=user1, timestamp=1, count=4)])
  • The three windows at the end aren't actually emitted, but would presumably be the ones containing element2, above, if they were.
  • Notably, element 3 never even shows up in processWatermark.

Does Scotty, as implemented, only work if you have consistent (non-sporadic) stream of events? Is there any workaround if you have something with keys that only receive sporadic traffic like this?

java.lang.ArrayIndexOutOfBoundsException

Hi,

We're facing the following exception while trying to adopt Scotty on the latest Flink-1.14.2:

switched from RUNNING to FAILED with failure cause: java.lang.ArrayIndexOutOfBoundsException: -1
	at java.util.ArrayList.elementData(ArrayList.java:424)
	at java.util.ArrayList.get(ArrayList.java:437)
	at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
	at de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
	at de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
	at de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
	at de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62)
	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:748)

We create a window operator as follows:

import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure}
import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator
import de.tub.dima.scotty.flinkconnector.{_}

    val windowOp = new KeyedScottyWindowOperator[(java.lang.String, Long), NaviGpsProcessable, NaviTrafficUserResult](new NaviTrafficUserAggregationScotty())
    windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, 60_000))

    val userAggStream = stream
      .keyBy(el => (el.id, el.trafficId))
      .process(windowOp)
      .map(_.getAggValues.get(0));

Can I get any advice on this?

Best,

Dongwon

LazyEvaluation of the "Lift" function.

Hey,
"Lift" function is lazily evaluated by calling the getAggValues() function, as it can be seen here:

stream
.keyBy(0)
.process(windowOperator)
.map(x -> x.getAggValues().get(0).f1)
.print();

You may consider triggering the "Lift" function when a window ends, instead of adding another operator for lazy evaluation.

Create seperate Demo folder

We should create a separate demo folder, which contains all demos and examples currently contained in the connectors.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.