Kelson Martins Blog

When working with Logstash, chances are you will someday require to transform/replace data in some way before sending it to its final destination.
That being said, this post aims to present the translate-plugin which can be used as a general search and replace tool that uses a configured hash and/or a file to determine replacement values.

Scenario – understanding the logstash-translate-plugin

To understand the translate-plugin, consider this scenario:
Your logstash pipeline is receiving logs over TCP in the following format:
{Datestamp} {ServiceCode} {Message}
With the {ServiceCode} field containing a unique identifier (UID) to a particular service such as “nginx” or “kafka”.
Log Sample:
03/07/2017 17:11:40 EZfAH1yD service stopped
03/08/2017 13:00:02 EZfAH1yD service started
03/08/2017 13:08:20 ddvgZOQL service restarted
Ideally, instead of receiving the UID, we would like to receive the actual service name from the sender but for the sake of the example, the sender is able to ship the log with its UID only.
The sender then provided us with the dictionary of all {ServiceCode} and its {ServiceName} representation, in the hope that we are able to perform some sort of in real-time processing to add or replace the {ServiceName} field into the stream.
Here is the provided dictionary:
EZfAH1yD  => "nginx"
ddvgZOQL" => "rsyslog"
InCCvabi" => "firewalld"
9PAMjJBx" => "zookeeper"
wGbeORZN" => "kafka"
SujIotpo" => "mysql"

Solution – Using logstash-translate-plugin

Given the scenario presented in the introduction, our job is then to come up with a solution to make use of the dictionary and inject the service name into the log.
My question is, how will we do that? Any guess?
Yes, you guessed it right. For our requirement, we will make use of the logstash-translate-filter tool.
So, let’s get our hands dirty.
This is a logstash configuration that receives and parse the original log format from the example above:
input {
    tcp {
        port => 9000
    }
}
filter {
    grok {
        match => {
            message => "%{WORD:service} %{GREEDYDATA}"
        }
    }
}
output{
    elasticsearch {
        index => "logstash-services-%{+YYYY-MM-dd}"
    }
}
In our example configuration, we are receiving logs over TCP connection on port 9000.
We also have a simple grok filter to parse our log format.
Finally, we forward the logs to the logstash-services index on elasticsearch.
So now let’s test our configuration:
1) Start logstash.
service logstash start
This will make logstash listen on port 9000 for incoming traffic.
2) In another terminal, open a connection to logstash with netcat and send an event.
netcat localhost 9000
EZfAH1yD service stopped
3) Now let’s query elasticsearch to view the last event:
curl localhost:9200/logstash-services-*/_search?pretty -d '{
    "size": 1,
    "sort": {"@timestamp": "desc"}
}
'
The expected result is:

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 20,
    "successful" : 20,
    "failed" : 0
  },
  "hits" : {
    "total" : 25,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "logstash-services-2017-03-10",
        "_type" : "logs",
        "_id" : "AVq4JEuEVyTgmQZUBH-T",
        "_score" : null,
        "_source" : {
          "@timestamp" : "2017-03-10T12:14:30.685Z",
          "port" : 44150,
          "service" : "EZfAH1yD",
          "@version" : "1",
          "host" : "127.0.0.1",
          "message" : "EZfAH1yD service stopped"
        },
        "sort" : [
          1489148070685
        ]
      }
    ]
  }
}
Now, to achieve our desired goal of replacing the {ServiceCode} to {ServiceName} as soon as the event reaches our logstash input, let’s install the logstash-filter-translate plugin.
This is required as the translate plugin do not come with logstash by default, so to install it, perform the following:
bin/logstash-plugin install logstash-filter-translate
After that, logstash-plugin service will download the logstash-filter-translate from the plugin repository and install it on your logstash.
To confirm installation, perform the following, to query your plugins searching for the translate:
bin/logstash-plugin list | grep translate
Now, let’s alter our logstash configuration to make use the translation filter.
input {
        tcp {
                port => 9000
        }
}
filter {
        grok {
                match => {
                        message => "%{WORD:service} %{GREEDYDATA}"
                }
        }
        translate {
                field => "service"
                destination => "service_name"
                override => false
                dictionary_path => /etc/logstash/translate_dictionary.yml
        }
}
output{
        elasticsearch {
                index => "logstash-services-%{+YYYY-MM-dd}"
        }
}
The difference here is the use of the translation block, composed of:
translate {
    field => "service"
    destination => "service_name"
    override => false
    dictionary_path => /etc/logstash/translate_dictionary.yml
}
field – Which is the source field that we want to translate
destination – Which field we want to store the translated value
override    – If true, the value of the original field will be overwritten. In our case, we want the translation to be added in a new field. Thus the value false.
dictionary  – Location of our dictionary on the filesystem.
Before testing our new configuration, the final part of our puzzle is missing, the dictionary file.
In this example, we will use a YML file, so as we defined in our translate block, create the /etc/logstash/translate_dictionary.yml following dictionary file based on our data.
"EZfAH1yD": "nginx"
"ddvgZOQL": "rsyslog"
"InCCvabi": "firewalld"
"9PAMjJBx": "zookeeper"
"wGbeORZN": "kafka"
"SujIotpo": "mysql"
Now let’s make our final test. After restarting logstash, make a new connection with netcat and send a new event:
netcat localhost 9000
EZfAH1yD service started
Now, let’s query elasticsearch again to check
curl localhost:9200/logstash-services-*/_search?pretty -d '{
    "size": 1,
    "sort": {"@timestamp": "desc"}
}
'
The expected result is:
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 20,
    "successful" : 20,
    "failed" : 0
  },
  "hits" : {
    "total" : 26,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "logstash-services-2017-03-10",
        "_type" : "logs",
        "_id" : "AVq4JXKEVyTgmQZUBH-V",
        "_score" : null,
        "_source" : {
          "@timestamp" : "2017-03-10T12:15:46.193Z",
          "port" : 44156,
          "service" : "EZfAH1yD",
          "service_name" : "nginx",
          "@version" : "1",
          "host" : "127.0.0.1",
          "message" : "EZfAH1yD service started"
        },
        "sort" : [
          1489148146193
        ]
      }
    ]
  }
}
GREAT!!! As you can see, we now have a new field called “service_name”, which has the translation for our service.
Based on this, you can play around and increase its functionality. The plugin supports quite a few other configuration and all details can be found at the plugin official documentation.

Final considerations

As a general recommendation, the translate plugin should be used with a big list of dictionary values. For a small list of replacements, you might consider using the gsub function of the mutate filter.
On this example, we used a YML file. Alternatively to this, translate filter also supports JSON and CSV file formats.
Finally, if your dictionary dynamically grows over time, you can make use of the refresh_interval attribute to make a logstash request an updated file more often. By default, the interval is set to 300 seconds.
I hope that this article was useful and leave a comment if you have any thoughts on the subject =).

Software engineer, geek, traveler, wannabe athlete and a lifelong learner. Works at @IBM

Next Post