Simple Kafka and Elasticsearch integration

Kafka is a awesome platform for moving data around. It is often used together with an Elasticsearch cluster in order to host data before data gets ingested into Elasticsearch.

Kafka deals with topics to carry some specific data around. Imagive having topics for dns,dhcp, firewall and so. You can quickly end up with a high number of topics, right?

So in this blog post , I will present a way for you to utilize a single Kafka topic to carry many kinds of data, while still being able to ingest into different Elasticsearch indices. It also enables you to specify the rotation of the indices: rollover,weekly,daily or what your needs may be.

The approach works by creating a few simple properties alongside your data:

  • myapp
  • myrotation

Lets use this scenario. You have some kind of logfile, that contains log data for your app “blog-hits”. Your app is low volume in terms of log and you just need a weekly index.

You install filebeat and add these entries to your filebeat configuration.

  fields:
    myapp: blog-hits
    myrotation: weekly
  fields_under_root: true

You would then configure Filebeat to send this to Logstash for further parsing. After parsing Logstash sends to Kafka on a topic called “application-logs”, which you have configured on your Kafka servers.

If you prefer, you can also add the myapp and myrotation fields in the Logstash parsing your data. It is just a matter of preference.

You will have a Logstash consumer of topic “application logs” in a pipeline like this:

input
{
  kafka  {
                 bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
                 topics => [ "application-logs" ]
                 codec => "json"
                 group_id => "tier1"
                 decorate_events => true
  }
}

Please notice , that I used decorate_events. This is important for the rest of the pipeline.

Next we will define the filter section:

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
    }

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
}

In the filter, we make sure , that we have default values for myapp and myrotation. Now we get to the interesting output section:


output
{
      if [myrotation] == "rollover"
      {
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-active"
                   }
      }

      if [myrotation] == "daily"
      {
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-%{+YYYY.MM.dd}"
                   }
     }

      if [myrotation] == "weekly"
      {
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[customapp]}-%{+xxxx.ww}"
                   }
      }
  }

In the output section, we use the information gathered from myapp and myrotation in order to ingest our logs into an application specific index. So this pipeline is just being used to route the data to the correct index.

In this case , data will get stored in “application-log-blog-hits-2019.14”.

You can use this simple approach to carry many different kind of data in a single Kafka topic, while still ingesting to a separate index in Elasticsearch.

Leave a Reply

Your email address will not be published. Required fields are marked *