Introduction
In a previous post, I showed how to do simple Kafka and Elasticsearch integration. It showed how to use a single Kafka topic to carry many different types of logs into Elasticsearch.
Have a read if you want to catch up or haven’t read it.
This approach had an undesired sideeffect of putting attributes into Elasticsearch, that are not needed and wasting precious diskspace.
Metadata
However there is very simple and elegant way to fix this. Have a read of the description of Logstash metadata fields here
Previous article suggested this approach. This meant storing kafkatopic,myapp and myrotation in every single document, that went through pipeline.
filter {
mutate {
copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
}
if ![myapp]
{
mutate {
add_field => { "myapp" => "default" }
}
}
if ![myrotation]
{
mutate {
add_field => { "myrotation" => "weekly" }
}
}
}
If we convert to using metadata fields, it could look like this instead. No more kafkatopic,myapp or myrotation being stored.
filter {
if ![myapp]
{
mutate {
add_field => { "myapp" => "default" }
}
}
if ![myrotation]
{
mutate {
add_field => { "myrotation" => "weekly" }
}
}
# take advantage of metadata fields
if [myapp]
{
mutate {
rename => { "myapp" => "[@metadata][myapp]" }
}
}
if [myrotation]
{
mutate {
rename => { "myrotation" => "[@metadata][myrotation]" }
}
}
}
We can then use the new metadata stuff in the output section
output
{
if [@metadata][myrotation] == "rollover"
{
elasticsearch {
hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
manage_template => false
index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-active"
}
}
if [@metadata][myrotation] == "daily"
{
elasticsearch {
hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
manage_template => false
index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-%{+YYYY.MM.dd}"
}
}
if [@metadata][myrotation] == "weekly"
{
elasticsearch {
hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
manage_template => false
index => "%{[@metadata][kafka][topic]}-%{[@metadata][customapp]}-%{+xxxx.ww}"
}
}
}
Debugging
As all outputs automatically remove the @metadata object and you are trying to debug your conf file, you now need to do a simple trick to display the contents of metadata.
output
{
# also show contents of metadata object
stdout { codec => rubydebug { metadata => true } }
}
Conclusion
So by using this approach we are no longer storing kafkatopic,myapp and myrotation as attributes in every single document, that is passing through this pipeline.
We save diskspace,processing time and documents are clean.