microsoft / fluent-plugin-azure-storage-append-blob Goto Github PK
View Code? Open in Web Editor NEWFluent Plugin Azure Storage Append Blob
License: MIT License
Fluent Plugin Azure Storage Append Blob
License: MIT License
Currently the plugin writes each entry in the format "${time} ${tag} ${record}". Is there anyway to make it write the record only?
According to https://docs.microsoft.com/en-us/rest/api/storageservices/append-block#avoiding-duplicate-or-delayed-appends.
We can avoid duplicate logs by using x-ms-blob-condition-appendpos
.
But current implement seems not support that option:
I am using this plugin to send logs to different containers depending on namespaces.
I during operation I remove a container from Azure or change the secret key then it start to raise error but other outputs are still working, you can see in
errorWhileWorking.txt file the error.
On the other if the fluentd process is starting and some of the outputs are not available it is starting to loop infinitely as shown in
errorStarting.txt file, preventing other destinations in the configuration to ship their logs.
I conducted few load test against the plugin and noticed that logs are missing. I set up a job to deploy a container that writes messages to stdout at preconfigured rate, say, 1000 messages per second and the job ran for 10s. At the end of the job, the container will report total message then I compared what we see on blob storage and I noticed there were discrepancy sometimes:
fluentd-load-test-0-td8wm expected: 10001, actual: 10000 x
fluentd-load-test-0-vwtxc expected: 30001, actual: 29998 x
Fluentd container log didn't report any warning and errors during those load tests.
We are using v.11 Fluentd build and I have the exact set up on aws environment but with a different output plugin (s3 output) and I don't see problems there with the same load tests.
Are there parameters I need to tune differently for azure-storage-append-blob plugin to not drop messages?
Here are my configs:
---
apiVersion: v1
data:
filter.conf: |
<filter **>
@type record_transformer
<record>
k8s_cluster_name "#{ENV['K8S_CLUSTER_NAME']}"
</record>
</filter>
kind: ConfigMap
metadata:
name: filter-conf
namespace: fluentd
---
apiVersion: v1
data:
fluent.conf: |-
@include "#{ENV['FLUENTD_SYSTEMD_CONF'] || 'systemd'}.conf"
@include "#{ENV['FLUENTD_PROMETHEUS_CONF'] || 'prometheus'}.conf"
@include filter.conf
@include kubernetes.conf
@include monitoring.conf
@include conf.d/*.conf
<system>
log_level warn
</system>
<match **>
@type azure-storage-append-blob
azure_storage_connection_string "#{ENV['AZUREBLOB_CONNECTION_STRING']}"
azure_container "#{ENV['AZUREBLOB_CONTAINER']}"
auto_create_container true
path "#{ENV['AZUREBLOB_LOG_PATH']}"
azure_object_key_format "%{path}%{time_slice}#{ENV['K8S_CLUSTER_NAME']}_#{ENV['HOSTNAME']}_%{index}.log"
time_slice_format "/%Y/%m/%d/%H/%M/"
# if you want to use %{tag} or %Y/%m/%d/ like syntax in path / azure_blob_name_format,
# need to specify tag for %{tag} and time for %Y/%m/%d in <buffer> argument.
<buffer>
@type file
path /var/log/fluent/azurestorageappendblob
timekey 30
timekey_wait 0
timekey_use_utc true
chunk_limit_size 256m
flush_at_shutdown true
</buffer>
</match>
kind: ConfigMap
metadata:
name: fluentd-conf
namespace: fluentd
---
apiVersion: v1
data:
kubernetes.conf: |-
<label @FLUENT_LOG>
<match fluent.**>
@type null
</match>
</label>
<source>
@type tail
@id in_tail_container_logs
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}"
exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}"
refresh_interval 5
read_from_head true
<parse>
@type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}"
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
<source>
@type tail
@id in_tail_minion
path /var/log/salt/minion
pos_file /var/log/fluentd-salt.pos
tag salt
<parse>
@type regexp
expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
time_format %Y-%m-%d %H:%M:%S
</parse>
</source>
<source>
@type tail
@id in_tail_startupscript
path /var/log/startupscript.log
pos_file /var/log/fluentd-startupscript.log.pos
tag startupscript
<parse>
@type syslog
</parse>
</source>
<source>
@type tail
@id in_tail_docker
path /var/log/docker.log
pos_file /var/log/fluentd-docker.log.pos
tag docker
<parse>
@type regexp
expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
</parse>
</source>
<source>
@type tail
@id in_tail_etcd
path /var/log/etcd.log
pos_file /var/log/fluentd-etcd.log.pos
tag etcd
<parse>
@type none
</parse>
</source>
<source>
@type tail
@id in_tail_kubelet
multiline_flush_interval 5s
path /var/log/kubelet.log
pos_file /var/log/fluentd-kubelet.log.pos
tag kubelet
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_kube_proxy
multiline_flush_interval 5s
path /var/log/kube-proxy.log
pos_file /var/log/fluentd-kube-proxy.log.pos
tag kube-proxy
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_kube_apiserver
multiline_flush_interval 5s
path /var/log/kube-apiserver.log
pos_file /var/log/fluentd-kube-apiserver.log.pos
tag kube-apiserver
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_kube_controller_manager
multiline_flush_interval 5s
path /var/log/kube-controller-manager.log
pos_file /var/log/fluentd-kube-controller-manager.log.pos
tag kube-controller-manager
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_kube_scheduler
multiline_flush_interval 5s
path /var/log/kube-scheduler.log
pos_file /var/log/fluentd-kube-scheduler.log.pos
tag kube-scheduler
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_rescheduler
multiline_flush_interval 5s
path /var/log/rescheduler.log
pos_file /var/log/fluentd-rescheduler.log.pos
tag rescheduler
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_glbc
multiline_flush_interval 5s
path /var/log/glbc.log
pos_file /var/log/fluentd-glbc.log.pos
tag glbc
<parse>
@type kubernetes
</parse>
</source>
<source>
@type tail
@id in_tail_cluster_autoscaler
multiline_flush_interval 5s
path /var/log/cluster-autoscaler.log
pos_file /var/log/fluentd-cluster-autoscaler.log.pos
tag cluster-autoscaler
<parse>
@type kubernetes
</parse>
</source>
# Example:
# 2017-02-09T00:15:57.992775796Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" ip="104.132.1.72" method="GET" user="kubecfg" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"
# 2017-02-09T00:15:57.993528822Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" response="200"
<source>
@type tail
@id in_tail_kube_apiserver_audit
multiline_flush_interval 5s
path /var/log/kubernetes/kube-apiserver-audit.log
pos_file /var/log/kube-apiserver-audit.log.pos
tag kube-apiserver-audit
<parse>
@type multiline
format_firstline /^\S+\s+AUDIT:/
# Fields must be explicitly captured by name to be parsed into the record.
# Fields may not always be present, and order may change, so this just looks
# for a list of key="\"quoted\" value" pairs separated by spaces.
# Unknown fields are ignored.
# Note: We can't separate query/response lines as format1/format2 because
# they don't always come one after the other for a given query.
format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
time_format %Y-%m-%dT%T.%L%Z
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}"
verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}"
ca_file "#{ENV['KUBERNETES_CA_FILE']}"
skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}"
skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}"
skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}"
skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"
</filter>
<filter kubernetes.**>
@type parser
<parse>
@type json
json_parser json
</parse>
key_name log
hash_value_field json_log
replace_invalid_sequence true
emit_invalid_record_to_error false
remove_key_name_field true
reserve_data true
</filter>
kind: ConfigMap
metadata:
name: kubernetes-conf
namespace: fluentd
---
apiVersion: v1
data:
prometheus.conf: |-
# Prometheus Exporter Plugin
# input plugin that exports metrics
<source>
@id prometheus
@type prometheus
</source>
<source>
@id monitor_agent
@type monitor_agent
</source>
# input plugin that collects metrics from MonitorAgent
<source>
@id prometheus_monitor
@type prometheus_monitor
<labels>
host ${hostname}
</labels>
</source>
# input plugin that collects metrics for output plugin
<source>
@id prometheus_output_monitor
@type prometheus_output_monitor
<labels>
host ${hostname}
</labels>
</source>
kind: ConfigMap
metadata:
name: prometheus-conf
namespace: fluentd
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
labels:
k8s-app: fluentd-logging
version: v1
name: fluentd
namespace: fluentd
spec:
selector:
matchLabels:
k8s-app: fluentd-logging
version: v1
template:
metadata:
labels:
k8s-app: fluentd-logging
version: v1
spec:
containers:
- env:
- name: AZUREBLOB_CONTAINER
value: x
- name: AZUREBLOB_LOG_PATH
value: x
- name: K8S_CLUSTER_NAME
value: x
- name: RESTART_TIME
value: "1595966504"
- name: AZUREBLOB_CONNECTION_STRING
valueFrom:
secretKeyRef:
key: connection_string
name: azblob
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
image: x
imagePullPolicy: Always
name: fluentd
ports:
- containerPort: 24231
resources:
limits:
memory: 800Mi
requests:
cpu: 200m
memory: 400Mi
volumeMounts:
- mountPath: /var/log
name: varlog
- mountPath: /var/lib/docker/containers
name: varlibdockercontainers
readOnly: true
- mountPath: /fluentd/etc/fluent.conf
name: config-volume
subPath: fluent.conf
- mountPath: /fluentd/etc/filter.conf
name: filterconfig-volume
subPath: filter.conf
- mountPath: /fluentd/etc/kubernetes.conf
name: kubernetesconfig-volume
subPath: kubernetes.conf
- mountPath: /fluentd/etc/prometheus.conf
name: prometheusconfig-volume
subPath: prometheus.conf
priorityClassName: cluster-addon-priority
serviceAccountName: fluentd
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoSchedule
operator: Exists
volumes:
- configMap:
name: fluentd-conf
name: config-volume
- configMap:
name: filter-conf
name: filterconfig-volume
- configMap:
name: prometheus-conf
name: prometheusconfig-volume
- configMap:
name: kubernetes-conf
name: kubernetesconfig-volume
- hostPath:
path: /var/log
name: varlog
- hostPath:
path: /var/lib/docker/containers
name: varlibdockercontainers
In our environment, we do not want to generate "root level" SAS tokens.
Is there anyway this plugin can be modified to support using a "bearer" token obtained from the OAuth endpoint?
Hi,
Storing large amounts of uncompressed data is expensive. Would it be possible to add an option to gzip files before they are uploaded to the storage?
Thanks
I have a fluentd setup on AKS to ship logs to blob. IT is working fine, Currently the path in which logs are stored in the blob are container_name/logs/logfile.log
. I want the structure to be like this container_name/logs/pod_name/logfile.log
. I have used the kubernetes metadata filter plugin to filter out the attribute pod_name
. So now the question is how do I include the attibute pod_name in the azure_object_key_format
field. Below is my current configuration.
<match fluent.**>
@type null
</match>
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/td-agent/tmp/access.log.pos
tag container.*
#format json
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
read_from_head true
</source>
<match container.var.log.containers.**fluentd**.log>
@type null
</match>
<filter container.**>
@type kubernetes_metadata
</filter>
<match **>
@type azure-storage-append-blob
azure_storage_account mysaname
azure_storage_access_key mysaaccesskey
azure_container fluentdtest
auto_create_container true
path logs/
append false
azure_object_key_format %{path}%{tag}%{time_slice}_%{index}.log
time_slice_format %Y%m%d-%H-%M
# if you want to use %{tag} or %Y/%m/%d/ like syntax in path / azure_blob_name_format,
# need to specify tag for %{tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time,timekey>
@type file
path /var/log/fluent/azurestorageappendblob
timekey 300s
timekey_wait 10s
timekey_use_utc true # use utc
chunk_limit_size 5MB
queued_chunks_limit_size 1
</buffer>
</match>
Current plugin doesn't support any type of compression. I am also using s3 output which supports compression and that plugin provides better throughput. Is there a plan to add compression support for this plugin?
Hi,
Is it possible to add service principal authentication?
I have the following fields:
azure_oauth_tenant_id
azure_oauth_app_id
azure_oauth_secret
And I would like to use them to authenticate and use the plugin.
There seems to be an off-by-one error when a buffer is sent in two append ops to storage.
To repro this, I made a large text file:
$ yes 'a' | head -n 5000000 > bytetest.txt
Then configured fluentd with this:
<source>
@type tail
path /var/log/bytetest
pos_file /var/log/fluentd/bytetest.pos
tag bytetest
read_from_head true
<parse>
@type none
</parse>
</source>
<match bytetest>
@type azure-storage-append-blob
azure_storage_account <account>
azure_storage_access_key <key>
azure_container mycontainer
auto_create_container true
path /
azure_object_key_format bytetest.log
time_slice_format %Y-%m-%d/%Y-%m-%dT%H:%M:00
<format>
@type single_value
</format>
<buffer tag,time>
@type file
path /var/log/fluentd/azblob.bytetest
flush_mode interval
flush_at_shutdown false
timekey 60 # 1 minute
timekey_wait 60
</buffer>
</match>
Then cat these contents to the file to get fluentd to buffer the entire thing:
$ sudo touch /var/log/bytetest
$ cat bytetest.txt | sudo tee -a /var/log/bytetest > /dev/null
The resulting file has extra bytes.
aadmin@atf5f7ce0c04c-linux-1:~$ diff -u bytetest.txt bytetest2.txt
--- bytetest.txt 2020-10-07 13:06:30.144485029 +0000
+++ bytetest2.txt 2020-10-07 13:16:54.164703237 +0000
@@ -2097150,6 +2097150,7 @@
a
a
a
+
a
a
a
@@ -4194301,7 +4194302,7 @@
a
a
a
-a
+aa
a
a
a
Hi,
I would like to use this in azure govt. How would I go about that? I am sure its some endpoints that need to be changed since they are different from azure public.
The default list result for list_containers is limited to 5000 containers. This causes the container check to become unreliable when using this plugin with a storage account with more than 5000 containers.
See azure API for details: https://docs.microsoft.com/en-us/rest/api/storageservices/list-containers2#uri-parameters
It would be nice if this plugin supported retention, somehow. Since it's an append blob, lifecycle management in the storage account is not applicable.
Last published version is 0.1.1 which is quite old. Please release a new version with latest fixes included.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.