Git Product home page Git Product logo

kinesis-aggregation's Introduction

Kinesis Record Aggregation & Deaggregation Modules for AWS Lambda

The Amazon Kinesis Producer Library (KPL) gives you the ability to write data to Amazon Kinesis with a highly efficient, asyncronous delivery model that can improve performance. The KPL is extremely powerful, but is currently only available as a Java API wrapper around a C++ executable which may not be suitable for all deployment environments. Similarly, the Kinesis Client Library (KCL) provides automatic deaggregation of KPL aggregated records, but not all Kinesis consumer applications, such as those running on AWS Lambda, are currently capable of leveraging this deaggregation capability.

KPL Message Format How the Kinesis Producer Library Publishes Data

The components in this project give you the ability to process and create KPL compatible serialised data within AWS Lambda, in Java, Node.js and Python. These components can also be used as part of the Kinesis Client Library a multi-lang KCL application. However, this project has several limitations:

  • It only generates data, and doesn't perform delivery to the stream like the KPL. You still have to call PutRecord(s) to push data to Kinesis Data Streams
  • It doesn't manage data across multiple streams like KPL - the interface assumes that all data is sent to a single Stream

Aggregation

One of the main advantages of the KPL is its ability to use record aggregation to increase payload size and improve throughput. While this project is not a replacement for the full KPL, it does provide you the ability to easily aggregate multiple user records into larger aggregated records that make more efficient use of available bandwidth and reduce cost. This data is encoded using Google Protocol Buffers, and returned to the calling function for subsequent use. You can then publish to Kinesis and the data is compatible with consumers using the KCL or these Deaggregation modules.

Processing Model

Caution - you should only use Kinesis Aggregation outside of the Kinesis Producer Library for low-value messages where loss of a small number of messages is not critical. Aggregation results in messages being tagged to Shards, and in cases where the Stream is mutating during Aggregation, this can result in messages being rejected by the PutRecords API. DATA LOSS CAN OCCUR.. Kinesis Deaggregation is compatible with all KPL published data and can be used safely.

Deaggregation

The components in this library allow you to efficiently deaggregate protocol buffer encoded aggregated records in any application, including AWS Lambda.

Processing Model

Language Specific Implementations

AWS Lambda supports Java, Node.js, Python and Go as programming languages. We have included support for those languages so that you can create and process UserRecords via standalone modules. Documentation is provided for each language:

Language Location
Java java
Node.js Javascript node.js
Python python
Go go

Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

kinesis-aggregation's People

Contributors

a3lk4i avatar ajorg-aws avatar akursar avatar andrejleitner avatar armaseg avatar arostamian-truecar avatar brentwritescode avatar cb-xav avatar cosmincatalin avatar cspotcode avatar cstrydom avatar davidrosenstark avatar dependabot[bot] avatar geronimo-iia avatar hyandell avatar ianmeyers avatar jbarrus avatar jeremywall avatar jodevsa avatar joebowen avatar jojinkb avatar jumoel avatar komazarari avatar plopezlpz avatar shimont avatar shyykoserhiy avatar sportrang avatar stevegoossens avatar takesthebiscuit avatar toltar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinesis-aggregation's Issues

Deaggregating an `AggRecord` object python

I would like to be able to aggregate and deaggregate a record within the same application. I tried the following:

from aws_kinesis_agg.aggregator import RecordAggregator
from aws_kinesis_agg.deaggregator import deaggregate_records
import uuid
import json

aggregator = RecordAggregator()

result = None
while result is None:
    record = {"key": "value"}
    rec = json.dumps(record)
    result = aggregator.add_user_record(str(uuid.uuid4()), rec)

raw_record = result.get_contents()[-1]
records = deaggregate_records(raw_record)

But it fails with an exception:

Traceback (most recent call last):
  File "example.py", line 15, in <module>
    records = deaggregate_records(raw_record)
  File "/home/alexeshoo/repo/env/lib/python3.6/site-packages/aws_kinesis_agg/deaggregator.py", line 111, in deaggregate_records
    return_records.extend(iter_deaggregate_records(records))        
  File "/home/alexeshoo/repo/env/lib/python3.6/site-packages/aws_kinesis_agg/deaggregator.py", line 133, in iter_deaggregate_records
    raw_data = r['kinesis']['data']
TypeError: 'int' object is not subscriptable

Is this possible somehow?

Cannot read property 'build' of null - aws-kinesis-agg/lib/common.js

Hello,

I'm having issues using this library in a Typescript project that is being webpacked.
I'm able to run the code in the TS package (blacksmith-kinesis-mapper) where i use the aws-kinesis-agg library and when I run my integration tests before packaging the Node Lambda (blacksmith-foundry-dynamo-listener) that pulls in the TS package.

The issue arrises when I package the lambda code up using webpack.
I get the error in AWS and when i run the code locally with lambda-local.

error: {
	"errorMessage": "Cannot read property 'build' of null",
	"errorType": "TypeError",
	"stackTrace": [
		"loadBuilder (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:5779:17)",
		"Object.<anonymous> (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:5787:26)",
		"Object../node_modules/aws-kinesis-agg/lib/common.js (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:5797:30)",
		"__webpack_require__ (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:20:30)",
		"Object../node_modules/aws-kinesis-agg/lib/kpl-deagg.js (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:6316:16)",
		"__webpack_require__ (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:20:30)",
		"Object../node_modules/aws-kinesis-agg/index.js (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:5719:34)",
		"__webpack_require__ (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:20:30)",
		"Object../node_modules/blacksmith-kinesis-mapper/lib/deaggregate.js (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:23714:43)",
		"__webpack_require__ (/Users/chstrydom/Documents/GitHub/blacksmith-foundry-dynamo-listener/build/index.js:20:30)"
	]
}

The stack trace seems to point to https://github.com/awslabs/kinesis-aggregation/blob/master/node/lib/common.js#L43

I'm not sure If it's an issue with my project or if you need to do something to prevent webpack from treeshaking needed files etc?

  • webpack: 4.43.0
  • typescript: 3.7.5
  • serverless-webpack: 5.3.2

RecordAggregator could generate aggregated records that are larger than 1MB

And these records cannot be added to a stream, throwing the following error:

com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: 
Value 'java.nio.HeapByteBuffer[pos=0 lim=1048954 cap=1048954]' at 'data' failed to satisfy 
constraint: Member must have length less than or equal to 1048576 (Service: AmazonKinesis; Status 
Code: 400; Error Code: ValidationException; Request ID: f114615a-9256-5a12-a33c-39f52eb5e9c3)

Please contact me for data to reproduce the problem.

Add support for Kinesis Analytics

Tried and tried as I may, this would not function with Kinesis Analytics so I had to modify the object as follows:

    for key, record in enumerate(event['records']):

        decode = record
        decode.update(decode['kinesisStreamRecordMetadata'])
        decode['kinesisSchemaVersion'] = '1.0'
        decode = {'kinesis': record}

        decode = deaggregate_records(decode)

Once the object was correct then it worked, however, I am still having an issue splitting records up, but that's an issue for another support forum.

My suggestion is to add support internally so this doesn't have to occur since Kinesis Analytics is still Kinesis...

Unable to get the Node Aggregation code to work

I have tried so many permutations of requires etc I cannot get the RecordAggregator to work in my code.

The README says:
var agg = require('aws-kinesis-agg');
var aggregator = new RecordAggregator();

but that gives me error: "RecordAggregator is not defined"

Looking at the package.json its main is "./kpl-deagg.js" which has nothing to do with the Aggregation code. Looking around I can only find examples of people using this module for the DeAgg code.

It would be great to get a working sample that works from the npm installed aws-kinesis-agg module.

Question about the type in deaggregate function

Hi,

Our team is currently using this library with Typescript. I'm casting a type of kinesis records since de-agg in this code uses KinesisStreamRecordPayload but it requires Kinesis.Record in the signature. Code

Would you be able to check this signature? Since I cannot find the use cases from entire github and I only found several codes in my company's code repository, I'm not sure about this.

Thanks,
Peter

KinesisAggregator might have issues with full Stream Records

Hi,

I suspect that a Stream Record that is full or almost full in respect to size will get silently dropped by either the Kinesis service or the KCL (KCL is more likely though).
I have experimented with the Aggregator for a while and have started to notice that some records actually don't show up on the other end of the stream. I have nailed it down to the size of the Stream Record.

If I manually handle the size of the Stream Record and intentionally make it slightly smaller, say 977 KB, all the records will be show correctly by the KCL on the other side of the stream.

If however I rely on addUserRecord to return a valid record when it is full, sometimes that record will not show on the other side of the stream (I actually mean that its corresponding User Records will not show).

An experiment with small but many User Records that fill up a Stream Record should probably prove useful, but unfortunately I have not had the time to make one that is worth sharing.

Best regards,
Cosmin

calculateRecordSize() mutates record.data through pass by reference

Hello everyone,

Very concerned about the separations of concerns here. After writing unit tests around this library in trying to use it and having issues, I have discovered that that this line changes the references value of the record passed in:

record.data = Buffer.from(record.data); // default utf8

Could there be a better way we can convert it to calculate the record data size without changing the original record? Any reason why the library does this?

Thanks,
Matt

Node test is broken

The node test kinesis-aggregation/node/node_modules/aws-kinesis-agg/test/testRawAggregation.js is broken. Additionally, the module is missing the dev dependencies necessary to run the tests.

#46 fixes these.

typescript: type definition for `UserRecord.data` says `Buffer`, but actual property is a `string`

Title pretty much says it all, the type definitions for the returned UserRecord are incorrect.

Here we can see they define the data property as a Buffer:
https://github.com/awslabs/kinesis-aggregation/blob/master/node/index.d.ts#L9

But in the actual code, it does not return data as a Buffer, but a base64 string:
https://github.com/awslabs/kinesis-aggregation/blob/master/node/lib/kpl-deagg.js#L97

We can see the example code showing that it is expecting a string object here:
https://github.com/awslabs/kinesis-aggregation/blob/master/node/example/sample-deaggregation.js#L141


Not sure which is the "correct" way, but I imagine the types should just be updated to reflect the actual code. (to prevent breaking change to existing consumer code)

Maven downloading every single aws sdk versions

Hello,

I have added the deaggregator dependency to my project

<dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>amazon-kinesis-deaggregator</artifactId>
       <version>1.0.3</version>
 </dependency>

But now maven is downloading 'every single' aws-sdk-* libs for every single version ?

Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.260/aws-java-sdk-dynamodb-1.11.260.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.260/aws-java-sdk-dynamodb-1.11.260.pom (4.4 kB at 6.7 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.261/aws-java-sdk-dynamodb-1.11.261.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.261/aws-java-sdk-dynamodb-1.11.261.pom (4.4 kB at 247 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.262/aws-java-sdk-dynamodb-1.11.262.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.262/aws-java-sdk-dynamodb-1.11.262.pom (4.4 kB at 23 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.263/aws-java-sdk-dynamodb-1.11.263.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.263/aws-java-sdk-dynamodb-1.11.263.pom (4.4 kB at 21 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.264/aws-java-sdk-dynamodb-1.11.264.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.264/aws-java-sdk-dynamodb-1.11.264.pom (4.4 kB at 20 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.265/aws-java-sdk-dynamodb-1.11.265.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.265/aws-java-sdk-dynamodb-1.11.265.pom (4.4 kB at 8.4 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.266/aws-java-sdk-dynamodb-1.11.266.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.266/aws-java-sdk-dynamodb-1.11.266.pom (4.4 kB at 23 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.267/aws-java-sdk-dynamodb-1.11.267.pom
Downloaded from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.267/aws-java-sdk-dynamodb-1.11.267.pom (4.4 kB at 20 kB/s)
Downloading from central: https://repo.maven.apache.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.268/aws-java-sdk-dynamodb-1.11.268.pom

How do I avoid doing this?

Thanks!

JS deaggregation error due to reuse of hash

In the JS code there's an MD5 hash that's used during deaggregation. It's reused on line 70 here https://github.com/awslabs/kinesis-deaggregation/blob/master/node/node_modules/kpl-deagg/kpl-deagg.js#L70

But crypto objects are not reusable and when the update method is called the second time it fails with a "HashUpdate fail" error message.

It's a simple fix to just instantiate the md5 crypto object before each call to update. I submitted a pull request with the fix here #1

Can this fix be incorporated and rolled out to NPM?

When no explicit has keys, deaggregation throws an IndexError

PR here: #4

If there are no explicit hash keys, attempting to index the ehks object fails with an IndexError:

list index out of range: IndexError Traceback (most recent call last): File "/var/task/function.py", line 38, in lambda_handler records = data_pipeline.extract_records_from_kinesis_event(lambda_event) File "/var/task/data_pipeline/utils.py", line 29, in extract_records_from_kinesis_event ret = [base64.b64decode(x['kinesis']['data']) for x in iter_deaggregate_records(event['Records'])] File "/var/task/aws_kpl_deagg/deaggregator.py", line 178, in iter_deaggregate_records error_string = _get_error_string(r, message_data, ehks, pks, ar) File "/var/task/aws_kpl_deagg/deaggregator.py", line 88, in _get_error_string (ehks[mr.explicit_hash_key_index] is not None, File "/var/task/google/protobuf/internal/containers.py", line 64, in getitem return self._values[key] IndexError: list index out of range

Node version of lib includes large dependency jshint

JSHINT files in the /dist directory were > 2 MB

This meant any lambda functions using this node lib were minimum 2MB without anything else included.

Searched codebase for jshint and found a single reference to it - IMO this is a dev dependency by definition.

clear_and_get() throws intermittent error in a multi-threaded environment

We are using kinesis_aggregator in a multi-threaded production environment to push records to kinesis and intermittently see this error:
Message AggregatedRecord is missing required fields: records[3].partition_key_index

  • First I thought the error was related to the partition key and I used UUID as the partition key(and was assured it is null), but still, error was occurring
  • I was able to replicate the error and after some debugging, I reached the following conclusion
    It is not a data-related issue, I tested with JMeter with the same record and intermittently this error occurs on the record.
    I think the error is occurring because maybe the kinesis aggregator is not thread-safe, we have 4 processes(with 30 threads each) on each machine, I suspect that if threads call, send_record(kinesis_agg.clear_and_get()) at the same time, and looking at the implementation of clear_and_get, which resets the aggregated record(hence the partition_key_index will be blank), another thread which has also called clear_and_get fails.

How can I fix this error?
How can I use kinesis aggregator in a multi-threaded environment?

Add support for deaggregation in Firehose python ฮป

Issue:
Deaggregation fails in kinesis firehose transform lambda(python) as the record metadata is under kinesisRecordMetadata and not kinesisStreamRecordMetadata (like in case of Kinesis analytics preprocess lambda).

Solution:
Handle this as a special case just like how kinesis analytics records are handled.

PR:
#83

KinesisAggregator and KCL don't play nice together

Hi,

After a lot of struggling with making the Java version of the KinesisAggregator play nice with the KCL, I have finally narrowed down the issue.

Issue

On one side, I have an app that uses the KinesisAggregator to write aggregated Stream Records on a Kinesis Stream. I use PutRecord as the persist API method. This is working fine.
The records produced with the help of the KinesisAggregator end up on the stream. You can try this and you will notice no exceptions and all the CloudWatch metrics will show what you would expect.

On the other side of the stream, I have a Spark app that under the hood uses the KCL to ingest data from Kinesis. The cool thing about the KCL is that it should handle aggregated Stream Records transparently to the user, as if there is no aggregation.
When running the said app however, I can see in CloudWatch that data is being read (from all the shards), but in Spark there is no data coming in ...aparently.

Cause

After going deep into the KCL code, I found where the Kinesis User Records go to die:

if (effectiveHashKey.compareTo(startingHashKey) < 0
        || effectiveHashKey.compareTo(endingHashKey) > 0) {
    for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
        result.remove(result.size() - 1);
    }
    break;
}

What this essentially does/says is that for each of the User Records that were already extracted from a Stream Record there is a conditional check to verify that their attached Explicit Hash Key corresponds to the Explcit Hash Key of the Stream Record from where they were extracted.
In other words, User Records need look as if they are coming from the shard from where they are actually coming from (as being part of a Stream Record). When this check fails for one User Record, all of the User Records from that Stream Record are being dropped. I guess this is a good thing, since otherwise I would have probably missed the issue ๐Ÿ˜€.

Now, the root cause of the problem is that the KinesisAggregator makes no check/enforcement or otherwise documents the fact that when one produces an aggregated Stream Record that goes to a specific shard (as indicated by its corresponding Explicit Hash Key), all of the User Records it contains must also have assigned Explicit Hash Keys that correspond to the Stream Record's shard.
The example provided in the SampleAggregatorProducer.java file goes something like this:

private static void sendViaBatch(AmazonKinesis producer, String streamName, RecordAggregator aggregator) {
    System.out.println("Creating " + ProducerConfig.RECORDS_TO_TRANSMIT + " records...");
    for (int i = 1; i <= ProducerConfig.RECORDS_TO_TRANSMIT; i++) {
        String pk = ProducerUtils.randomPartitionKey();
        String ehk = ProducerUtils.randomExplicitHashKey(); // <---- NEW EXPLICIT HASH KEY FOR EVERY RECORD
        byte[] data = ProducerUtils.randomData(i, ProducerConfig.RECORD_SIZE_BYTES);

        // addUserRecord returns non-null when a full record is ready to
        // transmit
        AggRecord aggRecord = aggregator.addUserRecord(pk, ehk, data);
        if (aggRecord != null) {
            ForkJoinPool.commonPool().execute(() -> {
                sendRecord(producer, streamName, aggRecord);
            });
        }
    }
}

You can see how each new User Record receives a new Explicit Hash Key that does not necessarily correspond to the first one in the Stream Record (which is the one that is actually used by the Stream Record). If just one of the aggregated User Records references an Explicit Hash Key from a different shard, all the records die in the KCL ๐Ÿ’€

Possible resolution

There are many ways in which this problem can be solved (these are just some suggestion from the top of my head):

  • Document the issue. Leave the code be, it is valid after all, you just need to know how to use it.
  • Modify the validateExplicitHashKey method here so that it actually checks the Explicit Hash Key in relationship to the shard the User record is supposed to go to. That is to say, make it compatible with the KCL.
  • Modify the methods so that the same Explicit Hash Key is actually persisted to all of the User Records inside a Stream Record transparently.

Best regards,
Cosmin

individual records ack - java

Hi,

What can be done to provide individual records acknowledgement in the java module, similar to what is provided in node.js ? What about checkpointing ? Thanks.

AggRecord.java

change
org.apache.commons.lang.StringUtils
to
org.apache.commons.lang3.StringUtils

PIP3 version is not consistent with the code in the repo

Hi there,

I used this code for Kinesis Data Analytics pre-processing Lambda function.
And I use "pip3 install aws_kinesis_agg" and there is a bug that recordId is missing, after looking into the source code, the code in pip3 package is different from the repo.
Could you please fix the issue?

Thanks,

Documentation for NodeJS is outdated

Hello,

Reading the doc it seems that there are some some differences with the code. For example:

In the doc:
deaggregateSync(kinesisRecord, afterRecordCallback(err, UserRecord[]);

In the code:
deaggregateSync = function(kinesisRecord, computeChecksums, afterRecordCallback)

Or:

In the doc:
deaggregate(kinesisRecord, perRecordCallback(err, UserRecord), afterRecordCallback(err, errorKinesisRecord));

In the code:
deaggregate = function(kinesisRecord, computeChecksums, perRecordCallback, afterRecordCallback)

Cheers,

Missing index.d.ts within files option of package.json

Hi guys,

I was recently working with this library in node/typescript but could not import the types. I noticed you added the index.d.ts file within #57, but did not add it to the files option in package.json. I would be glad to open a PR to fix this.

Thanks,
Matt

Iterative Aggregation documentation is incorrect

The Iterative Agg documentation reads as follows:

Iterative Aggregation

The iterative aggregation method involves adding records one at a time to the RecordAggregator and checking the response to determine when a full aggregated record is available. The add_user_record method returns None when there is room for more records in the existing aggregated record and returns an AggRecord object when a full aggregated record is available for transmission.

for rec in records:
    result = kinesis_aggregator.add_user_record(rec.PartitionKey, rec.Data, rec.ExplicitHashKey)
    if result:
        #Send the result to Kinesis

In reality, the iterative aggregation method add_user_record returns True or False and then we need to call get_contents to get the record.

Support for Golang?

Is there a plan to add support for golang? we have a use case where we sent mysql binlog data from Maxwell to kinesis, then consume data using aws lambda and everything was built on Go. Now we have to disable the aggregation (which apparently impact the performance)

Incorrect typings for deaggregate and deaggregateSync methods

deaggregate and deaggregateSync functions use incorrect types for kinesisRecord. All properties inside of the Kinesis.Types.Record are in PascalCase, for example Data is PascalCased at the same time kinesis-aggregation uses data in camelCase.
From [https://github.com/awslabs/kinesis-aggregation/blob/master/node/example/sample-deaggregation.js#L33](this method) it's clear that Kinesis.Types.Record is not the correct type for the parameter and for example KinesisStreamRecordPayload from @types/aws-lambda should be used.

I can make a PR if you ok with using @types/aws-lambda as a dep

Feature request: add support for splitting

Sometimes user records are larger than 1MB. In such cases the records should be split into multiple smaller kinesis records and then added to a stream.

The consumer then should concatenate these kinesis records to recreate the original user record.

I think this feature should be added to kinesis-aggregation (as well as KPL).

Add support for python deaggregation of Java KPL aggregated records

This project focuses mainly on supporting Kinesis (de)aggregation in the context of aws lambda. Would it be possible to make the deggregation code more generic so that it can be used to deaggregate records produced by Java KPL?

We have a use case where we produce using Java KPL and consume using python via boto3 client.get_records. There is no builtin support for processing these aggregated records from python: https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-integration.html. The code in deaggregator.py iter_deaggregate_records(records) is coupled to kinesis/lambda but with some refactoring, I think it could satisfy my use case.

Is the Node version of this library maintained?

I noticed the node version hasn't been updated since ~July 2019 and is currently referencing a version of a library (protobufjs) that has an active CVE open against it. #58 calls out the need for an update but no action has been taken.

Should the node module be considered unmaintained? And if yes, is there a maintained successor library we should be migrating to or should we fork it and perform the update ourselves?

Errors when using KinesisLambdaForwarder

I've set up the KinesisLambdaForwarder to forward from one Kinesis stream to another across account boundaries. I eventually figured out the IAM pieces to the point it no longer errors about permissions but now in the CloudWatch logs I often see this:

Received <n> raw Kinesis records.
Received <n> deaggregated Kinesis records.
Lambda function encountered fatal error: Invalid explicitHashKey, must be greater or equal to zero and less than or equal to (2^128 - 1), got -130

Is this related to #11 somehow?

constants.js sets undeclared globals

Inside node/node_modules/aws-kinesis-agg/constants.js a series of global variables are set by assigning to undeclared names. This is two problems: setting global variables is considered bad practice in JavaScript; and setting via undeclared variables is disallowed when the use strict directive is used, which compilers like WebPack will do automatically.

The fix is easy, just export the constants as an object and use that instead. I'll submit a PR for this shortly.

createExplicitHashKey could create keys that fail validation with validateExplicitHashKey

If we don't pass an explicit hash key to addUserRecord it will try to generate one from the partition key:

    public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) {
        validatePartitionKey(partitionKey);
        partitionKey = partitionKey.trim();

        explicitHashKey = explicitHashKey != null ? explicitHashKey.trim() : createExplicitHashKey(partitionKey);

For some partition keys, the generated hash key could be invalid. For example if my partition key is:

String pk = new String(new byte[] {
                66,
                69,
                84,
                0,
                0,
                0,
                0,
                46,
                69,
                -17,
                -65,
                -67,
                -17,
                -65,
                -67
        });

Then createExplicitHashKey(pk) will generate: -123. To reproduce:

public class Main {

    public static void main(String[] args) throws NoSuchAlgorithmException {
        String pk = new String(new byte[]{
                66,
                69,
                84,
                0,
                0,
                0,
                0,
                46,
                69,
                -17,
                -65,
                -67,
                -17,
                -65,
                -67
        });

        AggRecord record = new AggRecord();
        record.addUserRecord(pk, null, new byte[]{1, 2, 3});
    }

}

which throws:

java.lang.IllegalArgumentException: Invalid explicitHashKey, must be greater or equal to zero and less than or equal to (2^128 - 1), got -123
	at com.amazonaws.kinesis.agg.AggRecord.validateExplicitHashKey(AggRecord.java:464)
	at com.amazonaws.kinesis.agg.AggRecord.addUserRecord(AggRecord.java:329)

Node aggregation emits invalid records when user records do not include the ExplicitHashKey property

The ExplicitHashKey property is an optional string property which, if provided, must be a stringified number adhering to specific constraints. However, if not explicitly provided when using the node RecordAggregator, the resulting aggregated record will include this property with the javascript undefined object as its value. This in turn is encoded in the protobuf blob as the string "undefined", which is not a valid value for the ExplicitHashKey property.

We have a producer producing aggregated records in node, and a consumer in java. The java consumer fails to deaggregate the protobuf blob because it tries to parse the value of ExplicitHashKey as a BigInteger (as it should).

Obviously there are a number of issues here, but preventing the node library from spuriously inserting ExplicitHashKey keys to the user records would solve the immediate issue.

The size calculation logic fails when the explicit hash key is not specified (undefined) in the stream records.

The logic for record aggregation is as follows.
For each input stream record, estimate the size of the aggregated record formed ,after the new input record will be added to it.
If the estimated size is less than 1 MB, add the new record to the putRecord list.
If the estimated size is greater than or equal to 1 MB, do not add the new record to the putRecord list. Aggregate the records in the put record list and flush the aggregated record.
The size estimation logic fails when the explicit hash key is not specified in the input.
So aggregated records of size >1 MB are flushed to the KinesisStream. The Kinesis Stream then throws a ValidationError.

Necessity of deaggregation

I am sending data with KPL and i see there is aggregation as you see in output below comparing KPL metric to Kinesis Metric:

get-metric-statistics --dimensions Name=StreamName,Value=MQTTStream2 --start-time 2017-08-02T12:20:00 --end-time 2017-08-02T12:25:00 --period 60 --namespace KinesisProducerLibrary --statistics Sum --metric-name UserRecordsPut
{
"Datapoints": [
{
"Timestamp": "2017-08-02T12:24:00Z",
"Sum": 20,
"Unit": "Count"
}
],
"Label": "UserRecordsPut"
}

and in Kinesis metric for same time':
get-metric-statistics --namespace AWS/Kinesis --dimensions Name=StreamName,Value=MQTTStream2 --period 60 --start-time 2017-08-02T12:20:00 --end-time 2017-08-02T12:25:00 --statistics Sum --metric-name PutRecords.Success
{
"Datapoints": [
{
"Timestamp": "2017-08-02T12:24:00Z",
"Sum": 4.0,
"Unit": "Count"
}
],
"Label": "PutRecords.Success"
}

Yet when i look at logs i added, even before it deaggregates the number i see in userRecords.size() is 20. Is there automatic deaggregation occurring of the records? Or is the size indicating the total even though its not aggregated?

Thanks

Error deaggregating messages with Lambda

I am getting the following error. I appears that 'google' is not defined for the exception.

global name 'google' is not defined: NameError Traceback (most recent call last):
 File "/var/task/lambda-firehose.py", line 65, in lambda_handler
for record in deaggregate_records(event['Records']):
 File "/var/task/aws_kpl_deagg/deaggregator.py", line 108, in deaggregate_records
return_records.extend(iter_deaggregate_records(records))
 File "/var/task/aws_kpl_deagg/deaggregator.py", line 180, in iter_deaggregate_records
except google.protobuf.message.DecodeError as de:
 NameError: global name 'google' is not defined

Changing the code to...

except:
    is_aggregated = False

...bypasses the error.

Feature request: better KCL integration

Hi,
We are currently looking for a java alternative of KPL and the aggregation seems to be a good start. However the current aggregation solution doesn't seem to be compatible well with KCL solution (#11 ). Can we add more feature support into KCL integration? including:

  1. shard level aggregation and retry logic:
    Similar to #11 , KCL has validation on whether the aggregated data belongs to the correct shard and will skip to process batches that fail the validation. In order to support KCL integration, we need to parse the partition key of each record at producer level, map it to a specific shard and aggregated records for each shard.

  2. split/merge shards support:
    Since the producer data is aggregated based on shard, if the stream shards got splited/merged, the producer needs to detect the shard changes and re-shuffle the record based on the new shard-hashkey mappings. This is an additional bonus feature that could be supported based on 1.

  3. shard level rate limiting:
    Aggregation is a great way of reducing the throughputException. However in the case of large throughput and multiple kinesis producers, it would be ideal to have producer side rate limiting that monitoring the write traffic for each shard. This could help improve the producer performance and have the producer request failed at an early stage rather than fail at the sending stage.

Thanks!

Implement a Java version based on the AWS Java SDK 2.0

The AWS Java SDK 2.0 doesn't have any support for aggregate records in Kinesis, but this library uses the Kinesis message models from the 1.11.x AWS Java SDK.

If this were updated to use the 2.0 models, it would resolve a functionality gap in the 2.0 SDK.

NodeJS Convert Aggregate to Promise

I'd like to use the kinesis-aggregation in our existing NodeJS project that uses promises everywhere. Is there a way to convert the aggregate function to a promise. Here's a simple stub. It doesn't work -- the await never completes. I've tried various permutation of this but none have worked.

const aggregatePromise = (records, onReady) => {
  return new Promise((resolve, reject) => {
    aggregate(records, (aggregatedRecord, callback) => {
      onReady(aggregatedRecord, callback);
    }, () => resolve(), (err) => reject(err));
  });
};

const onReady= (aggregatedRecord, callback) => {
   // do stuff including kinesis.putRecord
};

await aggregatePromise(records, onReady);

Potential incompatibility with KPL

As @pfifer has mentioned in amazon-kinesis-producer#128 (emphasis mine):

In addition to this the KPL publishes to a stream, and not a specific shard. If two user records are destined for different shards they will never be aggregated together.

However kinesis-aggregation seems to aggregate user records into an AggRecord and then a PutRecordRequest even if they must end up in different shards. This could break KPL compatible clients (e.g. those using KCL).

Failed to forward Kinesis records to destination stream

I cannot write aggregated data to my kinesis stream. My lambda function and kinesis stream are in the same account. My lambda can read data from a-stream however cannot write b-stream.
Im getting this error,
ERROR: Failed to forward Kinesis records to destination stream: Unable to execute HTTP request: Connect to kinesis.us-east-1.amazonaws.com:443 [kinesis.us-east-1.amazonaws.com/54.239.22.11] failed: connect timed out

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.