Simplifying Logstash by adding complexity

Background

A lot of logs that goes into Logstash will be done using the beats protocol. So you will have a pipeline in Logstash listening for beats on port 5044 typically. This could be stuff coming from filebeat,winlogbeat,metricbeat or heartbeat.

In your Logstash filter section, you will over time end up with a huge mess trying to add the relevant parsing of logs inside a bunch of if statements. In the output section, you could see the same mess again,where you output the different types of logs inside another bunch of if statements.

If you have done stuff like this, your code will be increasingly difficult to read and debug. Not to mention the problems, you will face, if multiple persons need to be able to contribute to the configuration of Logstash. Also if you need to move parsing of a specific type to another Logstash node. Then you need to grab the relevant parts by copy/paste, which is errorprone.

input {
  beats {
    port => 5044
  }
}

filter {
  if [type] =="winlogbeat" {
    #enrich winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #enrich heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #enrich mylogfile
    ....
  }
  if [type] =="dns" {
    #enrich dns
    ....
  }
  if [type] =="dhcp" {
    #enrich ddhcp
    ....
  }
}

output {
  if [type] =="winlogbeat" {
    #output winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #output heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #output mylogfile
    ....
  }
  if [type] =="dns" {
    #output dns
    ....
  }
  if [type] =="dhcp" {
    #output dhcp
    ....
  }
}

Simplifying

So what to do about this problem you may ask. Earlier people did some stuff by using named conf files that would be picked up by Logstash to form a large configuration. However we want to be modern and use new features made available by Elastic.

Pipeline to pipeline

I read about pipeline-to-pipeline feature in Logstash a long time ago. There is an excellent article about the options here. This feature is now generally available in 7.4.

It’s actually very simple to implement. You create a pipeline file to receive the beats input and then distribute the events to small tailor made pipelines.

input {
  beats {
    port => 5044
  }
}

filter {
}

output {
        if [type] == "dns" {
          pipeline { send_to => dns }
        } else if [type] == "dhcp" {
          pipeline { send_to => dhcp }
        } else if [type] == "mylogfile" {
          pipeline { send_to => mylogfile }
        } else {
          pipeline { send_to => fallback }
        }
}

Then create a new pipeline to handle the specific log type. This code is restricted to parsing DNS logs.

input {
  pipeline { address => dns }
}

filter {
   # do only your parsing of DNS logs
}

output {
  # output dns
}

You must remember to add all your pipelines to your pipelines.yml file. Remember to think about whether you need in-memory queue or persisted queue per pipeline.

- pipeline.id: beats-input
  path.config: "/etc/path/to/beats-input.config"
  pipeline.workers: 3
- pipeline.id: dns
  path.config: "/etc/different/path/dns.cfg"
  queue.type: persisted
  queue.max_bytes: 4gb
- pipeline.id: dhcp
  path.config: "/etc/different/path/dhcp.cfg"
  queue.type: persisted
  queue.max_bytes: 1gb
- pipeline.id: mylogfile
  path.config: "/etc/different/path/mylogfile.cfg"
  queue.type: persisted
  queue.max_bytes: 2gb

Conclusion

We have started using this approach and will be doing this going forward. We get a much simpler way of handling many different types inside Logstash and we are able to distribute the work to more people.

On top of this we are seeing better latency times in logstash. I suggest to read this article while you are at it. You are effectively using parallel pipelines like the article suggests by this approach.

As always, use this approach if you find it applicable to your usecase.

Watching for no data

Introduction

So you are sending stuff to your Elasticsearch cluster with some beat, eg. filebeat. But as everyone knows , things go wrong , stuff breaks. But you are trying to be proactive and watch for stuff breaking. So why not let Elasticsearch monitor for missing stuff with a watcher. You go in search for some examples and pretty sure, you will end up at this repo: https://github.com/elastic/examples

The examples repo

This repo is used for providing examples of how to do various stuff with your shining Elasticsearch setup. And if you look in the alerter category , you will find a recipe called system fails to provide data. Oh yeah…

Looks pretty useful. Basically you are setting up a watcher to search an index for hosts seen in the last 24 hour and to search for hosts seen in the last 1 hour. However, there is a catch, the sample doesnt provide any example of how to do the delta. You just end up with 2 lists, that you have little use for 😉

The revised sample

Every change , I get , when I to talk my friends at Elastic, I tell them, the watcher is too hard to use. Make it simpler, please. And they smile and say, “we know” 🙂

So back to the problem.

You have to do some very funky looking painless scripting to find the delta of those lists, we started out with. You do this by the means of a transform.

This is how the transform sections looks in the repo. It is bascially empty, so there will be no transform going on.

  "actions": {
    "log": {
      "transform": {
      "script": {
        "id":"transform"
      }
    },
      "logging": {
        "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
      }
    }
  }

So this is my attempt to fix this problem. Dont get scared, it is not as bad as it looks. Just add it to the watcher.

  "transform": {
    "script": {
      "source": "def last_period = ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.stream().map(p -> p.key ).collect(Collectors.toList());def history = ctx.payload.aggregations.periods.buckets.history.hosts.buckets.stream().map(e -> e.key ).filter(p -> !last_period.contains(p)).map(p -> [ 'hostname':   p]).collect(Collectors.toList());return  history;",
      "lang": "painless"
    }
}

The source code laid out in a more readable format. Multiline painless scripts in the watcher UI , please , Elastic 😀

def last_period = ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.
  stream().
    map(p -> p.key ).
      collect(Collectors.toList());

def history = ctx.payload.aggregations.periods.buckets.history.hosts.buckets.
  stream().
    map(e -> e.key ).
      filter(p -> !last_period.contains(p)).
        map(p -> [ 'hostname':   p]).
          collect(Collectors.toList());

return  history;

That code will make a nice list of hosts that hasn’t delivered data in the last period.

To use the list in the action section, you do something like this. Notice the condition in there as well , to prevent the watcher going off and sending emails, when everything is working:

  "actions": {
    "log": {
      "condition": {
        "compare": {
          "ctx.payload._value.0": {
            "not_eq": null
          }
        }
      },
      "email": {
        "profile": "standard",
        "to": [
          "whoever@whatever.com",
        ],
        "subject": "oh no , data missing",
        "body": {
          "html": "<h1>Systems not delivering data in the last {{ctx.metadata.last_period}} perid</h1>  <ul> {{#ctx.payload._value}}<li>{{hostname}}</li>{{/ctx.payload._value}}</ul>"
        }
      }
    }
  },

Conclusion

As usual, there are more ways to achieve the same thing. You could probably do a extremely complex search also. But if you add these 2 sections to your watcher , you are good to go.

Fault tolerant Elasticsearch

Introduction

By default your Elasticsearch cluster is pretty robust. Typically you would go for a design with one primary shard and one replica shard. You could have multiple datacenters with low network latency and have the cluster operating in both centers at once. You could also have 2 racks with nodes.

But what happens if you loose one datacenter or one rack? Your cluster will likely go RED if you dont plan for it upfront.

Shard allocation awareness

There are multiple ways to design around a disaster. But one thing you surely need to be aware of is a feature called Shard Allocation Awareness.

You can read the documentation from Elastic here.

Basically this feature enables your Elastic cluster to know about your physical topology. This enables Elastic to be smart enough to put your primary shard and replica shards into 2 different zones. Zones can be a datacenter or a rack as mentioned before.

You tell Elastic this by adding node attributes to your config file. In this example we will add a node attribute called datacenter to our elasticsearch.yml file. It will have 2 possible values : dc1,dc2

node.attr.datacenter: dc1

Once you have added this attribute to all your nodes, you need to perform a rolling cluster restart for the attribute value to be read.

Afterwards you need to enable the feature.

put _cluster/settings
{
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"
  }
}

Shortly thereafter you will notice some shard activity going on in the cluster when the master will arrange your shards according to your topology. When the dust settles , you can rest assured, that your indices are present in both datacenters.

Forced shard allocation awareness

However this all sounds good, but there is a problem. Suppose you loose a datacenter (dc1) now. The cluster will do its best to recover. So it will begin making all replica shards in DC2 into primary and then will start to create new replica also in DC2. This means , that you need to have double up on diskspace in either center.

If you dont have the luxury of having double up on diskspace everywhere, then you should be aware of forced shard allocation awareness.

Here you enable this , notice that you now specify the possible values of the datacenter attribute.

put _cluster/settings
{
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"
   "cluster.routing.allocation.awareness.force.datacenter.values": 
 "dc1,dc2"
  }
}

When you do this, Elastic knows, that you intend to have your indices available on nodes tagged with these values. So when you loose all nodes in DC1, Elastic is not going to try recover everything into DC2. When this happens , you will see cluster go yellow with 50% of your shards missing. But cluster will be available and operate as before. When DC1 becomes available again , Elastic will start to recover as normal.

Additional benefits

This feature will do more for you than just help out in case of disaster. This feature can also help you when you need to a rolling cluster restart, rolling cluster upgrade or simple base OS patching.

Normally when you do a rolling upgrade, you need to do this node by node. This is cumbersome and takes time. With forced shard allocation awareness, you can take eg. 50% of your warm nodes out of service, patch them or change config and bring them back online. So you should have much faster maintenance on your cluster.

Summary

This setup is not for everyone. If you are really paranoid and have enough resources, you could also make your clusters available multiple places and use CCR as your recovery plan. Examine your options and choose what fits you best.

Index Lifecycle Management – ILM

Introduction

When you are operating Elasticsearch clusters, you will probably need some form of managing the lifetime of your indices. You could also need a way of handling migration from hot to warm nodes. Previously this was typically handled by a separate tool from Elastic called curator.

We have been running with Curator handling our indices for a long time, but we feel handling indices should be builtin functionality. We have had some scripts running in 3 steps as a cron job:

  • rollover – runs every 15 min
  • migration – runs every hour
  • deletion – runs once per day

The configuration files for Curator could prove to a little bit of a pain.

But when Elastic released 6.7.0 – their new Index Lifecycle Management was now a builtin feature making our daily administation simpler. No longer do you need an external cron job running curator. All ILM tasks can be done from within Kibana.

Migration from curator

So this is all good. But what do you actually need to do in order to start migration away from curator and instead start using ILM.

You dont have to migrate every thing in a big bang approach, just move 1 thing at a time. Most of our big indices are using rollover approach, so that will be the focus here.

We came up with these steps in order to do the migration for a specific group of indices.

  • Create ILM Policies
    • Create one for rollover and one for already rolled over indices
  • Apply policy for already rolled over indices
  • Attach rollover policy to template
  • Perform rollover
  • Remove curator support for current group of indices

Create ILM Policies

It’s straightforward to create a policy, just fire up Kibana, Navigate to Management and Index LifeCycle Policies and Create Policy.

So in this example we create 2 policies

  • dns-standard
  • dns-standard-rollover

We create a set of 2 policies per group of indices. If everything you have are using the same periods for hot/warm/deletion, you only need 2 policies for everything. But in order to allow for customization per group of indices, we create policies per group.

So the dns-standard policy is used for existing indices. So it will not contain a rollover phase.

Edit the dns-standard policy again and click the save as new at the top, enable the rollover phase and save it as dns-standard-rollover.

Now please verify the settings in the ILM policies matches your configuration from your old curator jobs 😉

Apply policy

In order to apply a policy to a set of indices, you need to goto Kibana Developer tools.

Once you run that , your current DNS indices will be handled by ILM.

Attach rollover policy

In order to attach the rollover policy to a template click actions for the rollover policy. You will be asked to select a template name and you need to enter the write alias for the rollover index.

Perform rollover

Next step is to do a manual rollover of your DNS alias. You will do that from Kibana Developer toools.

Now all DNS indices will be handled by ILM

Remove curator support

Goto your curator server and remove all references to the current set of indices.

Verify your work

Once you have completed the steps , you are able to see in ILM, that everything is working. You can see how many indices are handled by each policy. If you monitor this on a daily basis you will eventually see , when the dns-standard policy is no longer needed (linked-policies=0). This will happen according to your retention settings.

Conclusion

Please try out ILM in your test environment first to get familiar with the steps. Or learn the hard way 😉

The latest version of Curator is ILM aware. So it will not touch indices, that are marked by ILM.

One thing to be aware of , is that ILM doesn’t offer all advanced features of Curator. However it delivers, the needed functionality for most basic usecases.

We encourage you to try it out.

Parsing Linux DHCP logs

Introduction

So we are back at parsing DHCP logs. This time , we are taking a look at DHCP logs from Linux systems. They are quite simple and easy to parse. So while this is easy, let’s add some complication to the puzzle. Namely ECS.

ECS

Elastic Common Schema. This is a fairly new convention introduced by Elastic to help out with naming fields in your logs. The main goal is to achieve a consistent naming through your various log sources. This enables your users to know , that a IP adress is always parsed as [source][ip] and not src_ip,src_src_ipv4 and so on.

If you follow ECS , you will be able to index various kinds of firewall in the same Index and be able to visualize the data in Kibana, regardless of whether the data comes from Check Point , ASA or whatever.

The main problem with ECS is that it doesnt cover all sorts of naming yet. So you will find yourself in situation , where you simply can’t find something in ECS, that covers your particular usecase. But dont worry about it, just use what you can and be consistent.

DHCP messages

In your Linux DHCP logs, you should find lines that looks similar to this:

Apr 29 03:48:58 hostname dhcpd: DHCPACK to ip (mac) via router
Apr 29 03:48:58 hostname dhcpd: DHCPINFORM from ip via router
Apr 29 03:48:59 hostname dhcpd: DHCPDISCOVER from mac via router: unknown network segment
Apr 29 03:49:05 hsotname dhcpd: DHCPREQUEST for ip from mac (hostname) via router

So we will build a Logstash pipeline to parse this and convert it to ECS naming, whenever possible.

Logstash pipeline

We will jump right in there with some Logstash code. This pipeline will read input from Stdin, parse the logs and output the parsed to the console in a nice JSON format.

You will need Filebeat to ship your logs to Logstash and you will need to modify the pipeline to read a Beats input instead. You will also need to modify the Output section , so it will output to your desired location, whether it be Kafka or Elasticsearch directly.

input {

  stdin {
    id => "my_plugin_id"
  }
 
}

 

filter {

	if [message] =~ /dhcpd:/
	{
		grok{
			match => { 
				"message" => "%{SYSLOGBASE} %{GREEDYDATA:params}" 
					}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			grok{
				match => { 
					"params" => "%{WORD:[event][action]} %{GREEDYDATA:params2}" 
						}
			}
			if "_grokparsefailure"	 not in [tags]
			{
				grok{
					match => { 
						"params2" => 
						[
							"from %{IP:[source][ip]} via %{IPORHOST:interface}" ,
							"to %{IP:[source][ip]} \(%{MAC:[source][mac]}\) via %{IPORHOST:interface}" ,
							"from %{MAC:[source][mac]} (\(%{DATA:[host[hostname]}\) )?via %{IPORHOST:interface}(: %{GREEDYDATA:[error][message]})?" ,
							"on %{IP:[source][ip]} to %{MAC:[source][mac]} \(%{DATA:[host[hostname]}\) via %{IPORHOST:interface}" ,
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-own \(\+\/\-\)%{INT:[pool][maxown]}",
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-misbal %{INT:[pool][maxmisbal]}"
						]
							}
				}
			}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			if [source][mac]
			{
				mutate
				{
					gsub => [
					  # replace backslashes, question marks, hashes, and minuses
					  # with a dot "."
					  "[source][mac]", "[\:]", ""
					]
				}
			}
			if [source][ip]
			{
				mutate {
					   copy => { "[source][ip]" => "[related][ip]" }
				}
			}
		
		
			date {
				match => [ "timestamp",  "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
			}
			mutate {
			    remove_field => [ "message", "params2", "params", "timestamp" ]
				add_field => { "[ecs][version]" => "ecs-1.0.0" }
				add_field => { "[organization][id]" => "xx" }
				add_field => { "[observer][vendor]" => "linux" }
				 rename => { "logsource" => "[observer][host]"  }
			}
			
		}
	
		
	}
  
}  
 

output {

  stdout { codec => rubydebug }
  
  
}

Github

In order to make our lifes easier, we have posted the code in our Github repo.

Conclusion

This was a short example of parsing another log into Elastic. I highly encourage you to get yourself familiar with ECS. It will make a lot of things easier and more shareable for everyone.

Simple Kafka and Elasticsearch integration

Kafka is a awesome platform for moving data around. It is often used together with an Elasticsearch cluster in order to host data before data gets ingested into Elasticsearch.

Kafka deals with topics to carry some specific data around. Imagive having topics for dns,dhcp, firewall and so. You can quickly end up with a high number of topics, right?

So in this blog post , I will present a way for you to utilize a single Kafka topic to carry many kinds of data, while still being able to ingest into different Elasticsearch indices. It also enables you to specify the rotation of the indices: rollover,weekly,daily or what your needs may be.

The approach works by creating a few simple properties alongside your data:

  • myapp
  • myrotation

Lets use this scenario. You have some kind of logfile, that contains log data for your app “blog-hits”. Your app is low volume in terms of log and you just need a weekly index.

You install filebeat and add these entries to your filebeat configuration.

  fields:
    myapp: blog-hits
    myrotation: weekly
  fields_under_root: true

You would then configure Filebeat to send this to Logstash for further parsing. After parsing Logstash sends to Kafka on a topic called “application-logs”, which you have configured on your Kafka servers.

If you prefer, you can also add the myapp and myrotation fields in the Logstash parsing your data. It is just a matter of preference.

You will have a Logstash consumer of topic “application logs” in a pipeline like this:

input
{
  kafka  {
                 bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
                 topics => [ "application-logs" ]
                 codec => "json"
                 group_id => "tier1"
                 decorate_events => true
  }
}

Please notice , that I used decorate_events. This is important for the rest of the pipeline.

Next we will define the filter section:

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
    }

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
}

In the filter, we make sure , that we have default values for myapp and myrotation. Now we get to the interesting output section:


output
{
      if [myrotation] == "rollover"
      {
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-active"
                   }
      }

      if [myrotation] == "daily"
      {
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-%{+YYYY.MM.dd}"
                   }
     }

      if [myrotation] == "weekly"
      {
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[customapp]}-%{+xxxx.ww}"
                   }
      }
  }

In the output section, we use the information gathered from myapp and myrotation in order to ingest our logs into an application specific index. So this pipeline is just being used to route the data to the correct index.

In this case , data will get stored in “application-log-blog-hits-2019.14”.

You can use this simple approach to carry many different kind of data in a single Kafka topic, while still ingesting to a separate index in Elasticsearch.

Scaleable syslog pipeline

If you are receiving syslog data from a variety of network devices, you need a design that will allow you to receive and process syslog messages before you ingest them into your Elasticsearch cluster.

Processing syslog messages can be quite heavy in terms of CPU usage, if you are doing a lot of grok statements.

As always, this can be done in many different ways, but in this blog post I will show the basics of a Kafka based architecture.

Initial approach, that will work in many usecases: just put some kind of loadbalancer in front and use that to receive your syslog messages and ship them to some Logstash instances for processing.

This approach will be fine for a small to medium sized setup. But how will you scale this approach? Well , deploy one more Logstash server and edit your loadbalancer configuration to use the new Logstash server. But there is a smarter way.

I suggest that you have your loadbalancer forwarding to 2-3 Logstash servers. You create an extremely simple syslog pipeline. In this syslog input pipeline do absolutely nothing but forward the data to your Kafka cluster.

input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }
}

filter {
}

output{
  kafka { 
    bootstrap_servers => "kafka-01"
    topic_id =>  "raw-syslog"
  }
}

Of course since this is syslog be sure, that this pipeline is backed by a persistent queue in Logstash as syslog is send and forget.

The boxes to run this pipeline can be quite small as there will be no processing going on.

If you are running with RSyslog, you could even configure the RSyslog to send directly to Kafka and you won’t need this Logstash input pipeline.

But right now, you have just raw syslog messages living in your Kafka cluster. You need to process them. They could be ASA firewall messages, where you need to parse them.

So you create an additional Logstash pipeline, that pulls the raw syslog data from Kafka and parses it. This pipeline should be running on other boxes than the one, that received the data. Preferably some quite beefy Logstash nodes. When parsed you can send it back to Kafka, if you need or you can ingest into your Elasticsearch cluster at this point.

kafka{
	group_id => "raw-syslog-group"
	topics => ["raw-syslog"]
	bootstrap_servers => "kafka-01:<port>"
	codec => json
}

filter {
    if "%ASA-" in [message] {
      grok {
        match => [
          "message", "<%{POSINT:syslog_pri}>%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} %%{CISCOTAG:cisco_tag}: %{GREEDYDATA:cisco_message}"
        ]
      }
      syslog_pri { }
  ....
  ....
  }
}

elasticsearch {
 hosts => ["elastic-01:9200"]
 index => "syslog-%{+YYYY.MM.dd}" 
}

The trick is the group consumer feature of Kafka. In the example I specified group_id => “raw-syslog-group” . So no matter how many Logstash instances have this pipeline running, they will be working as a unit in regards to Kafka.

If you find you need more processing power , deploy an additional Logstash node and deploy this pipeline. You dont have to change your loadbalancer configuration at all.

This setup also makes your life easier , if you can centralize your Logstash processing to a few beefy Logstash nodes. Comes in handy if you are thinking of using Memcached for lookup of malicious IP’s or domain names in all your syslog messages. Hey , that sounds like a topic for a complete blog post of its own;)

Processors in Winlogbeat

There is a probably little known feature hidden in the Beats. Its something called Processors.

By using processors you can do some lightweight filtering of data before the data leaves your endpoint.

We have used this feature in order to help decode event 2889 from the Directory Service Log on Domain Controllers. An event 2889 will appear in your Directory Service Log, if someone is binding to your Domain Controllers by clear text LDAP passwords. You really dont want that going on these days,right?

The event contains a field called event_data.param1. This looks like <ip>:<port>. But in most cases you really dont care about the port, you only need the IP from where the traffic is coming, so you can visualize it in Kibana.

So you can define a section in your winlogbeat.yml to fix this:

processors:
- drop_fields:
    fields: ["host"]   
 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"
- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

This example is using 3 processors.

First section is using the drop_fields processor. This will drop fields from the events with the name “host”. We do this because of a mapping conflict between data from old Winlogbeat versus new Winlogbeat.

- drop_fields:
    fields: ["host"]   

Next processor is the dissect processor. This is the one that is parsing our <ip>:<port> and splits the information up into 2 distint fields. Notice we have a when condition, so we only do this in case of event_id equals 2889.

 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"

Finally we try to use the DNS processor to resolve the IP adress to a proper DNS name

- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

As usual, all this could be in other ways. If you have your Winlogbeat data flowing through Logstash, you can do parsing there instead. This is just a short example of what can be done with the processors feature.

Elastic 7.0.0 Beta 1 Released

Elastic is moving fast as usual in preparation for the upcoming version 7.0.0. They have just released 7.0.0 beta 1, so it’s out of alpha mode. It important to keep an eye on this, so you will not be surprised by some of the breaking changes.

As usual it is packed with new features:

  • TLS 1.3 Support
  • ZEN2 – Cluster coordination layer.
  • Nano second precision
  • New Maps application
  • New Heartbeat Application
  • New Visual Layout in Kibana
  • Query Speed Improvements
  • Elastic Common Schema
  • The list goes on

Read the official news here:

https://www.elastic.co/blog/elasticsearch-7-0-0-beta1-released