Using Logstash @metadata

Introduction

In a previous post, I showed how to do simple Kafka and Elasticsearch integration. It showed how to use a single Kafka topic to carry many different types of logs into Elasticsearch.

Have a read if you want to catch up or haven’t read it.

This approach had an undesired sideeffect of putting attributes into Elasticsearch, that are not needed and wasting precious diskspace.

Metadata

However there is very simple and elegant way to fix this. Have a read of the description of Logstash metadata fields here

Previous article suggested this approach. This meant storing kafkatopic,myapp and myrotation in every single document, that went through pipeline.

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
    }

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
}

If we convert to using metadata fields, it could look like this instead. No more kafkatopic,myapp or myrotation being stored.

filter {

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
   # take advantage of metadata fields
   if [myapp]
   {
      mutate {
        rename => { "myapp" => "[@metadata][myapp]" }
      }
   }
   if [myrotation]
   {
      mutate {
        rename => { "myrotation" => "[@metadata][myrotation]" }
      }
   }
}

We can then use the new metadata stuff in the output section

output
{
      if [@metadata][myrotation] == "rollover"
      {
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-active"
                   }
      }

      if [@metadata][myrotation] == "daily"
      {
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-%{+YYYY.MM.dd}"
                   }
     }

      if [@metadata][myrotation] == "weekly"
      {
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][customapp]}-%{+xxxx.ww}"
                   }
      }
  }

Debugging

As all outputs automatically remove the @metadata object and you are trying to debug your conf file, you now need to do a simple trick to display the contents of metadata.

output
{
  # also show contents of metadata object
  stdout { codec => rubydebug { metadata => true } }
}

Conclusion

So by using this approach we are no longer storing kafkatopic,myapp and myrotation as attributes in every single document, that is passing through this pipeline.

We save diskspace,processing time and documents are clean.

Leave a Reply

Your email address will not be published. Required fields are marked *