Comments (10)
Unfortunately this solution will only help you ingest log data from the time you spin up the stack. This is using a feature in CloudWatch Logs (CWL) called Subscriptions, which is a mechanism to stream real-time logs from CWL to another destination while the logs are getting ingested into CWL.
CWL has another feature to export archived data to S3, and in theory you could use this feature to populate your Elasticsearch cluster with historical data in your log group. However, as far as I know, there aren't any tools to facilitate that setup at the moment. I think the easiest way to set this up would be to write something that consumes the exported data from S3, and then insert them into the Kinesis stream (using the PutRecord API) using the same format that CWL uses. (The format is documented in the developer guide; See the Examples section). Then this stack would consume those records and insert them to your Elasticsearch cluster normally.
from cloudwatch-logs-subscription-consumer.
hi @dvassallo ok understood, so I've exported the data to s3 and sync'd it locally. I'm going to use the awscli and gzcat
to process these and put-record into kinesis. I'm kind of stuck on what the partition-key would be, how do I find this or is it arbitrary?
put-record --stream-name CWL-Elasticsearch-KinesisSubscriptionStream-1O1NMVORNI953 --partition-key 0 --data '2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK' --profile stg
I set it to 0 for better or worse for the moment.
This works though i'll dump in the whole set of .gz files and report back
from cloudwatch-logs-subscription-consumer.
Your choice of the partition key values is not important for this to function correctly, but if you have more than 1 Kinesis shard you may want to use a random value (ideally) for each PutRecord API call. Data sent with a particular partition key will end up going to a single Kinesis shard, and since Kinesis shards have limited bandwidth you may exceed the shard's capacity, especially if you start replaying data very quickly (which would result in ProvisionedThroughputExceededException on PutRecord calls).
from cloudwatch-logs-subscription-consumer.
BTW - If you think that whatever you end up scripting may benefit other users of this project, we would welcome a pull request for it! :) I think something like what you're doing with the AWS CLI could go in a "scripts" directory in this repo. No obligation obviously.
from cloudwatch-logs-subscription-consumer.
It's interesting you mention that throughput on PutRecord calls, my one liner worked, but was really slow, so I killed it. I was trying to do this quickly via...
dump records
gzcat -r ./data/*.gz >file.txt
see how many records there are
ruby -e 'arr1=ARGF.read;p arr1.split("\n").length' file.txt
845798
process them
ruby -e 'arr1=ARGF.read;arr1.split("\n").each {|line| cmd=`aws kinesis put-record --stream-name #{streamname} --partition-key 0 --data "#{line}" --profile stg`;puts cmd }' file.txt
i might have to look at the put-records
command to do in batch
this is very slow, ill reply back or create a pull request for a decent script should it work :)
from cloudwatch-logs-subscription-consumer.
Note that you can put more than one log event in a single Kinesis.PutRecord (singular) call. The "data" payload is already expected to contain an array of log events. Here's an example of how 3 log events would get serialized into a singe Kinesis record (posted via the data attribute of a single PutRecord call):
{
"owner": "123456789012",
"logGroup": "VPCFlowLogs",
"logStream": "eni-12345678",
"subscriptionFilters": [
"FlowLogs"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "record 1"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "record 2"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "record 3"
}
]
}
You may want to check out Step 8 of Example 1 in the developer guide for an example of how the data should look like in Kinesis. That example shows you how to use the AWS CLI to deserialize the Kinesis record into the above JSON structure with basic unix tools. (Note that the data attribute is expected to be gzipped.) This project expects the Kinesis data to be in that exact format for it to be able to read from Kinesis and send to Elasticsearch. This is where the parsing happens: CloudWatchLogsSubscriptionTransformer.java L50-L101
from cloudwatch-logs-subscription-consumer.
thanks @dvassallo that's good information I have bookmarked it. We do not know how much we will have to do this, so my evolving quick and dirty solution is now to pipe output to gnu parallel
Is put-records not a decent solution, I mean I assume the aws cli is doing that behind the scenes anyways right for xml raw api calls?
gzcat -r ./export1/74dad32f-97af-4987-8df8-20ee2ea4fb10/eni-abcd1234-all/*.gz | parallel -j 10 aws kinesis put-record --stream-name CWL-ElasticsearcKinesisSubscriptionStream-1O1NMVORNI953 --partition-key 0 --data "{}" --profile stg
parallel, while not elegant, allowed me to now get up to over 1000 put requests per period using 10 concurrent jobs
from cloudwatch-logs-subscription-consumer.
Yes, PutRecords would work as well.
Your one-liner solution is quite impressive actually!
from cloudwatch-logs-subscription-consumer.
... though I just realized from re-reading your second comment that you initially attempted to pass a sample log event directly to the Kinesis.PutRecord "data" attribute:
--data '2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK'
But what the "data" attribute should have contained is the following json blob, all gzip'ed:
{
"owner": "<Your AWS Account Id>",
"logGroup": "<Your Log Group Name>",
"logStream": "< Your Log Stream Name>",
"subscriptionFilters": [
"<Not Important In This Case""
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "1000000",
"timestamp": 1455656471000,
"message": "2016-02-16T21:01:11.000Z 2 0123456789012 eni-abcd1234 5.6.7.8 1.2.3.4 35434 443 6 5 825 1455656471 1455656531 ACCEPT OK"
}
]
}
The "id" attribute for log events is just a unique ID that Elasticsearch would use to de-duplicate. You shouldn't use the same id string for different events.
from cloudwatch-logs-subscription-consumer.
Ah yes, I'm passing garbage as --data then that makes sense why I'm not seeing events show up in the kibana dashboard. I will have to script this a bit to generate the id timestamp and format the message properly.
One note, on the shard iterator and validating test data, on a mac base64 uses -D
as -d is debug
but this way very helpful in pointing me in the right direction
echo -n "H4sIAAAAAAAAADWOwQqCQBRFf2WYdUSRiriLMBdZQgYtImLSlz7SGZk3FiH+e6PW8nAv956O10AkCjh9GuAB3ySH0zGJb/swTddRyGdcvSXoIalUm7+FycpYFWSDShWRVm1js4lSo0HUE1J7p0xjY1DJLVYGNPHgch174QukGbDjmE91g1bDiNqOLR3X9RzPcxYr35/99QaBc8x+euynF7BNCdkTZcFKEJUpmXqw3C6hFMMz26EEQmI0qs15f+2/+3ON6vIAAAA=" | \
base64 -D | zcat
{"messageType":"CONTROL_MESSAGE","owner":"CloudwatchLogs","logGroup":"","logStream":"","subscriptionFilters":[],"logEvents":[{"id":"","timestamp":1455646640388,"message":"CWL CONTROL MESSAGE: Checking health of destination Kinesis stream."}]}
will reply back later when I have something working
from cloudwatch-logs-subscription-consumer.
Related Issues (19)
- Cloudformation template failure HOT 2
- Fails to send JSON with a string prefix and no extracted fields to elasticsearch HOT 1
- keeps getting roll_back HOT 2
- How to map a geo_point in indices that are created automatically by elasticsearch-kopf? HOT 1
- How can I update Kibana version? HOT 2
- Issues with splitting string fields HOT 2
- Kopf-Elasticsearch Version Mismatch HOT 2
- Support for multiple log groups HOT 7
- Record failed with message: UnavailableShardsException
- How do you import these dashboard JSON templates into Amazon Elasticsearch Service? HOT 1
- Setup in an existing VPC HOT 1
- Consumer doesn't restart after instance reboot HOT 9
- Cloudformation doesn't start sample template HOT 2
- Trying to set this up. HOT 8
- SSL options for Elasticsearch consumer needed HOT 2
- CloudTrail to CloudWatch Issues HOT 2
- Errors using consumer with ES 2.0 HOT 3
- Can't be used with Amazon Elasticsearch service HOT 7
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cloudwatch-logs-subscription-consumer.