Kelson Martins Blog

Recently I was involved in migrating an architecture involving Logstash & Kafka into fluentd, where the goal was to take advantage of Docker’s built-in fluentd logging driver to adopt the technology and move away from having a centralized Logstash instance (moving away from Kafka as consequence).
One requirement for being able to use fluentd was to have buffering capabilities that would take the role of Kafka from the original architecture. This feature would be critical as buffering is key to ensure that log events are not lost when the destination is unavailable.
The following diagram simulates an architecture using Logstash and Kafka, where we insert into and read from Kafka using Logstash Kafka Output Plugin and Kafka Input Plugins respectively:

The following diagram simulates the use of fluentd in alternative to the previous architecture, where we aim to use fluentd as both log processor and buffer:

Scenario

To demonstrate our use-case, let’s assume a scenario in which we have fluentd collecting logs from containers running on the host and forwarding them into an instance of Elasticsearch.
This would be achieved by a fluentd configuration file similar to:
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<match [docker]*>
  @type copy
  <store>
    @type elasticsearch
    host 127.0.0.1
    port 9200
    scheme http
    logstash_format true
    logstash_prefix fluentd-containers
    logstash_dateformat %Y%m%d
    include_tag_key true
    type_name access_log
    tag_key @log_name
    flush_interval 1s
  </store>
  <store>
    @type stdout
  </store>
</match>
The above configuration simply allows a connection into fluentd daemon where docker will connect to.
Then a match block capture docker logs and forward them to both Elasticsearch and stdout.
With the service running, if we inspect fluentd logs we will identify that a connection with Elasticsearch will be opened and container logs will be forwarded to Elasticsearch.
[kelson@localhost td-agent]# tail -f /var/log/td-agent.log
2018-02-26 15:50:15 +0000 [info]: #0 Connection opened to Elasticsearch cluster => {:host=>"127.0.0.1", :port=>9200, :scheme=>"http"}
2018-02-26 15:50:43.000000000 +0000 [docker][26e21001ecee][elksinglecompose_kibana_1][elksinglecompose_kibana]: {"container_id":"26e21001eceedef506bf7f7e5e4bff915ccd5f6875945cceec617fddd216d98e","container_name":"/elksinglecompose_kibana_1"
2018-02-26 15:50:44.000000000 +0000 [docker][26e21001ecee][elksinglecompose_kibana_1][elksinglecompose_kibana]: {"container_id":"26e21001eceedef506bf7f7e5e4bff915ccd5f6875945cceec617fddd216d98e","container_name":"/elksinglecompose_kibana_1",
2018-02-26 15:50:47.000000000 +0000 [docker][26e21001ecee][elksinglecompose_kibana_1][elksinglecompose_kibana]: {"log":"{\"type\":\"response\",\"@timestamp\":\"2018-02-26T15:50:47Z\",\"tags\":[],\"pid\":1,\"method\":\"post\",\"statusCode\":200,\"req\":{\"url\":\"/elasticsearch/_msearch\",\"method\":\"post\",\"headers\":{\"host\":\"127.0.0.1:5601\
2018-02-26 15:50:57.000000000 +0000 [docker][26e21001ecee][elksinglecompose_kibana_1][elksinglecompose_kibana]: {"container_id":"26e21001eceedef506bf7f7e5e4bff915ccd5f6875945cceec617fddd216d98e","container_name":"/elksinglecompose_kibana_1","source":"stdout","log":"{\"type\":\"response\",
2018-02-26 15:50:57.000000000 +0000 [docker][26e21001ecee][elksinglecompose_kibana_1][elksinglecompose_kibana]: {"container_id":"26e21001eceedef506bf7f7e5e4bff915ccd5f6875945cceec617fddd216d98e","container_name":"/elksinglecompose_kibana_1","source":"stdout","log":"{\"type\":\"response\",
Now, let’s assume that our Elasticsearch destination is unavailable. Inspecting our fluentd logs while our destination is unreachable, the following can be noticed:
[root@localhost]# tail -f /var/log/td-agent/td-agent.log
2018-02-26 15:56:39 +0000 [warn]: #0 failed to flush the buffer. retry_time=3 next_retry_seconds=2018-02-26 15:56:39 +0000 chunk="5661f8d9db5d24b92abd8efa7da123c9" error_class=Fluent::Plugin::ElasticsearchOutput::ConnectionFailure error="Can not reach Elasticsearch cluster ({:host=>\"127.0.0.1\", :port=>9200, :scheme=>\"http\"})!"
2018-02-26 15:56:39 +0000 [warn]: #0 suppressed same stacktrace
2018-02-26 15:56:46 +0000 [warn]: #0 failed to flush the buffer. retry_time=4 next_retry_seconds=2018-02-26 15:56:46 +0000 chunk="5661f8d9db5d24b92abd8efa7da123c9" error_class=Fluent::Plugin::ElasticsearchOutput::ConnectionFailure error="Can not reach Elasticsearch cluster ({:host=>\"127.0.0.1\", :port=>9200, :scheme=>\"http\"})!"
Note that as expected, fluentd connection with Elasticsearch fails and log events started to be buffered in memory while constantly retrying to restore Elasticsearch connection.
This behavior can be dangerous as if the connection is not restored quickly enough, we may start to lose log events as they may start to be dropped from our memory buffer.

Enabling fluentd file buffering

Let’s now see how we may use fluentd file buffering capabilities to aid us in our scenario, where to avoid incidents, we should have total control on how much and for how long to buffer events.
In a nutshell, fluentd buffer can be used by any output plugins and as the official documentation describes, it is essentially a set o “blocks” of data, each composed by a collection of records concatenated into a single blob. These blocks are constantly flushed into the output queue and finally into its destination, which is our scenario is Elasticsearch.
Let’s examine a buffer block that fit our example line by line:
buffer_type file
buffer_path /var/log/td-agent/buffer/docker-containers/
buffer_chunk_limit 16m
buffer_queue_limit 512
buffer_queue_full_action 3
retry_wait 15.0
First, we define the service to buffer to file. (Memory is the default option).
Next, we define a path for our buffer. Here it is important to understand that each output match block requires an independent buffer block. Also, each buffer_path must be unique.
Next, we define the chunk limit. As we previously mentioned, events will be grouped together into chunks and a new chunk will be created once the value defined here is hit. This parameter defaults to 8mb.
Next, we define the queue limit. This is the number of chunks that are allowed to be created.This parameter is important as leaving the queue to grow indefinitely may cause disk usage to go out of control.
Next, we define the queue full action which is the action to take once the queue_limit set previously exceeds. Option 3 is the option that drops the oldest chunks.
Finally, we set the retry_wait which is the amount of time that fluentd will wait before retrying to re-establish a connection with the destination (Elasticsearch in our case). Important to note that the retry period will be double after each unsuccessful attempt.
As a final piece of information, as we defined each chunk to have 16mb while also defining that we want a limit of 512 chunks, we are then talking about storing 8GB worth of data prior to fluentd start to drop log events. You may then modify these parameters to best suit your requirement while taking into consideration the resources that are at your disposal.

Based on our buffer configuration, the full configuration file snippet would be the following:

<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>

<match [docker]*>
  @type copy
  <store>
    @type elasticsearch
    host 127.0.0.1
    port 9200
    scheme http
    logstash_format true
    logstash_prefix fluentd-containers
    logstash_dateformat %Y%m%d
    include_tag_key true
    type_name access_log
    tag_key @log_name
    flush_interval 1s

    buffer_type file
    buffer_path /var/log/td-agent/buffer/docker-containers/
    buffer_chunk_limit 16m
    buffer_queue_limit 512
    buffer_queue_full_action 3
    retry_wait 15.0
  </store>
  <store>
    @type stdout
  </store>
</match>

Testing fluentd’s file buffer configuration

Now that we have successfully defined our buffer configuration, let’s test it.
First, I let’s restart fluentd service.
[kelson@localhost td-agent]# service td-agent restart
As soon as fluentd is restarted, let’s examine the buffer_path directory which by now must have been created by the service.
[kelson@localhost td-agent]# ls -l /var/log/td-agent/
total 40
drwxr-xr-x. 4 td-agent td-agent    41 Feb 27 13:48 buffer
-rw-r-----. 1 td-agent td-agent  4738 Feb 27 13:48 td-agent.log
 Note that as expected, we now have an empty buffer directory, which the service created to host our chunks of data.
Now, as soon as the Elasticsearch destination becomes unreachable, our buffer enters in action. This can be seen by tailing fluentd service log.
[kelson@localhost td-agent]# tail -f td-agent.log
2018-02-27 13:57:10 +0000 [warn]: #0 failed to flush the buffer. retry_time=6 next_retry_seconds=2018-02-27 13:57:10 +0000 chunk="56631fcdf8e911fb27d657beb1ca37fb" error_class=Fluent::Plugin::ElasticsearchOutput::ConnectionFailure error="Can not reach Elasticsearch cluster ({:host=>\"127.0.0.1\", :port=>9200, :scheme=>\"http\"})!"
2018-02-27 13:57:10 +0000 [warn]: #0 suppressed same stacktrace
2018-02-27 13:58:14 +0000 [warn]: #0 failed to flush the buffer. retry_time=7 next_retry_seconds=2018-02-27 13:58:14 +0000 chunk="56631fcdf8e911fb27d657beb1ca37fb" error_class=Fluent::Plugin::ElasticsearchOutput::ConnectionFailure error="Can not reach Elasticsearch cluster ({:host=>\"127.0.0.1\", :port=>9200, :scheme=>\"http\"})!"

We can detect that fluentd is again buffering the events to continuously retry to reestablish the connection. This time, however, we may inspect our buffer path where we are expected to find our buffered events. This happens because we are no longer buffering to memory, which is the default behavior.

[kelson@localhost td-agent]# ls -l /var/log/td-agent/buffer/docker-containers/
total 1576
-rw-r--r--. 1 td-agent td-agent 1076 Feb 27 13:56 buffer.q56631fcdf8e911fb27d657beb1ca37fb.log
-rw-r--r--. 1 td-agent td-agent  157 Feb 27 13:56 buffer.q56631fcdf8e911fb27d657beb1ca37fb.log.meta
-rw-r--r--. 1 td-agent td-agent 2904 Feb 27 13:56 buffer.q56631fd04b32f65712eb122b7bd00302.log
-rw-r--r--. 1 td-agent td-agent  139 Feb 27 13:56 buffer.q56631fd04b32f65712eb122b7bd00302.log.meta
-rw-r--r--. 1 td-agent td-agent  666 Feb 27 13:56 buffer.q56631fd316dad854c4bc983ac803372d.log
Finally, as soon the destination becomes available, the service will reestablish the connection and forward all buffered events, freeing out buffer_path from any content.
This can be noticed by inspecting both fluentd log and buffer_path directory.
[kelson@localhost elk_single_compose]# tail -f /var/log/td-agent/td-agent.log
2018-02-27 14:14:07 +0000 [info]: #0 Connection opened to Elasticsearch cluster => {:host=>"127.0.0.1", :port=>9200, :scheme=>"http"}
2018-02-27 14:14:07 +0000 [warn]: #0 retry succeeded. chunk_id="56631fcdf8e911fb27d657beb1ca37fb"
Now note that all our buffered events was sent and our buffer_path directory emptied by the service.
[kelson@localhost elk_single_compose]# ls -l /var/log/td-agent/buffer/docker-containers/
total 0

Conclusion

If you work with fluentd or any other data collection service, buffering is something that definitely has to be explored as ensuring that all collected events are forwarded into its destination is a crucial requirement for any logging architecture. That being said, this post aimed to provide a simple scenario where fluent file buffering feature is used to hold our precious data while any end destination is unavailable.
For more details on fluentd buffering capabilities, visit the official documentation for the feature here.
Stay tuned.

Software engineer, geek, traveler, wannabe athlete and a lifelong learner. Works at @IBM

Next Post