Simplifying Logstash by adding complexity

Background

A lot of logs that goes into Logstash will be done using the beats protocol. So you will have a pipeline in Logstash listening for beats on port 5044 typically. This could be stuff coming from filebeat,winlogbeat,metricbeat or heartbeat.

In your Logstash filter section, you will over time end up with a huge mess trying to add the relevant parsing of logs inside a bunch of if statements. In the output section, you could see the same mess again,where you output the different types of logs inside another bunch of if statements.

If you have done stuff like this, your code will be increasingly difficult to read and debug. Not to mention the problems, you will face, if multiple persons need to be able to contribute to the configuration of Logstash. Also if you need to move parsing of a specific type to another Logstash node. Then you need to grab the relevant parts by copy/paste, which is errorprone.

input {
  beats {
    port => 5044
  }
}

filter {
  if [type] =="winlogbeat" {
    #enrich winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #enrich heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #enrich mylogfile
    ....
  }
  if [type] =="dns" {
    #enrich dns
    ....
  }
  if [type] =="dhcp" {
    #enrich ddhcp
    ....
  }
}

output {
  if [type] =="winlogbeat" {
    #output winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #output heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #output mylogfile
    ....
  }
  if [type] =="dns" {
    #output dns
    ....
  }
  if [type] =="dhcp" {
    #output dhcp
    ....
  }
}

Simplifying

So what to do about this problem you may ask. Earlier people did some stuff by using named conf files that would be picked up by Logstash to form a large configuration. However we want to be modern and use new features made available by Elastic.

Pipeline to pipeline

I read about pipeline-to-pipeline feature in Logstash a long time ago. There is an excellent article about the options here. This feature is now generally available in 7.4.

It’s actually very simple to implement. You create a pipeline file to receive the beats input and then distribute the events to small tailor made pipelines.

input {
  beats {
    port => 5044
  }
}

filter {
}

output {
        if [type] == "dns" {
          pipeline { send_to => dns }
        } else if [type] == "dhcp" {
          pipeline { send_to => dhcp }
        } else if [type] == "mylogfile" {
          pipeline { send_to => mylogfile }
        } else {
          pipeline { send_to => fallback }
        }
}

Then create a new pipeline to handle the specific log type. This code is restricted to parsing DNS logs.

input {
  pipeline { address => dns }
}

filter {
   # do only your parsing of DNS logs
}

output {
  # output dns
}

You must remember to add all your pipelines to your pipelines.yml file. Remember to think about whether you need in-memory queue or persisted queue per pipeline.

- pipeline.id: beats-input
  path.config: "/etc/path/to/beats-input.config"
  pipeline.workers: 3
- pipeline.id: dns
  path.config: "/etc/different/path/dns.cfg"
  queue.type: persisted
  queue.max_bytes: 4gb
- pipeline.id: dhcp
  path.config: "/etc/different/path/dhcp.cfg"
  queue.type: persisted
  queue.max_bytes: 1gb
- pipeline.id: mylogfile
  path.config: "/etc/different/path/mylogfile.cfg"
  queue.type: persisted
  queue.max_bytes: 2gb

Conclusion

We have started using this approach and will be doing this going forward. We get a much simpler way of handling many different types inside Logstash and we are able to distribute the work to more people.

On top of this we are seeing better latency times in logstash. I suggest to read this article while you are at it. You are effectively using parallel pipelines like the article suggests by this approach.

As always, use this approach if you find it applicable to your usecase.

Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

Part 2:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…

https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html

This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

memcached{
	        hosts => ["127.0.0.1:11211"]
		get => {"domain-%{destination.domain}" => "[misp_src]"}
		}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use securitydistractions.com as the value within the destination.domain field.

Logstash will append “domain-” to “securitydistractions.com”, resulting in “domain-securitydistractions.com”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
	{
		mutate
		{
			add_field=> {"[misp_src]" => "none"}	
		}
	}

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.

Enriching ElasticSearch With Threat Data – Part 2 – Memcached and Python

In our previous post we covered MISP and some of the preparation work needed to integrate MISP and ElasticSearch. With MISP now setup and prepped, we can now focus on Python and Memcached.

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

This image has an empty alt attribute; its file name is image-1024x547.png

Background

First a little background into why we chose to use Memcached for our ElasticSearch integration…..

Threat data feeds are dynamic by nature, they are being constantly updated and multipe times a day. The updates contain both additions to the feeds and deletions. This means our enrichment engine would need to be dynamic too…. To explain this better, we will use Ransomware Tracker as an example..

Lets say a new IP is published to the Ransomware Tracker feed, this would be easy to manage in an enrichment engine, as we could simply add the new IP to our list. But what if an IP is removed from Ransomware Tracker, now we have to monitor the Ransomware Tracker feed to find the deletion, then check our own list to see if we have this IP and then delete it from our list. This can very quickly get complex to handle…

Another way to handle it could be to monitor the Ransomware Tracker feed for changes, when a change is made then clear our list completely and pull the latest feed instead….. This would solve part of the problem, but it can result in having a small period where the enrichment engine is empty, it also increases complexity as we would have to delete the list each time, which is definitely not what we wanted!

We decided to look into a way of simply assigning a TTL to each IoC on the feed, and then age out the IoC’s which are no longer present on the feed. We would setup our script to pull the feed at a given time interval, then push this into our enrichment engine store. Simple yet incredibly effective… This method also had to be supported by ElasticSearch, and how lucky we were that Logstash has a filter plugin for memcached. So it was this we settled on using to store the feed data for enrichment.

Memached – Preparation

Memcached meets our requirements for being simple, and handling aging of IoC’s, it is also supported by ElasticSearch/Logstash which makes it perfect for this task. It also comes with the huge additional benefit of storing the data in memory, so lookups from Logstash to the data will be ultra fast.

https://memcached.org/

The Memcached application is a very simple key-value store running in memory, you can telnet into the application running by default on port 11211.

The application is made up of only a few commands. The ones we are in need of here, are the “get” and “set” commands. Both of which are quite self explanatory….

The set command will be used by our Python script, to set the data into the store.

The get command will be used by the Logstash filter plugin, to query the store for a specific IoC and return the result back to Logstash.

The only thing we need to do, is set the structure of the data within the key-value store. Since we are going to be working with multiple data types, domain names, IP addresses etc. We will make our key a combination of the datatype and the IoC. So in the example that securitydistractions.com is on the RansomwareTracker feed, it will be represented as: “domain-securitydistractions.com”.

Using the key as the combination of the data type and the IoC will be easier to understand later when we look at the Logstash configuration.

The value will be the feed name, so in this example “Feed-RansomwareTracker”.

The TTL can be set to whatever suits your organisation, in our example we will use 70 seconds. This is because we are going to run our Python script for pulling the feed from MISP every 30 seconds, this would then allow us to miss 1 pull and not age out all IoC’s within the memcached store.

So the set command for memcached with our example data will be as follows:- “domain-securitydistractions.com”, “Feed-RansomwareTracker”, “70”.

It is highly recommended that you run Memcached on the same machine as logstash, for latency purposes. In our lab we are running everything on a Debian VM. There are Debian packages available for Memcached…..

Python – Memcache/MISP integration

Caveat: I am not a developer, and my programming skills are limited… The script here only had to be very simple, so it suited by skill level. There will be multiple ways to improve it in the future… But this is what we are running with here, and it works!

As ever, any form of integration between tools is probably going to require some form of scripting. In our case we knew we needed a script that would handle the pulling of the data from our MISP platform API, and then pushing this data into Memached. The full script can be found at the bottom of the page….

The first part is our interaction with the MISP API….

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN MISP API KEY',
            'Accept': 'application/json',
            'Content-type': 'application/json',
             }

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-RansomwareTracker","to_ids":"yes"}'

    response = requests.post('https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API

    return response

Remember to change the “Authorization” section within the header to your own API key.

The data variable, is used to tell the MISP API which IoC’s we want to retrieve, in this example we are asking for all domain names that are tagged with the “Feed-RansomwareTracker” and where the “to_ids” setting is set to yes. This will be returned as plaintext…

Remember also to change the URL within the response variable to reflect the domain name or IP address of your own MISP instance. I have also disabled verification of SSL as it is done within my lab. It is not recommended to keep this setting if you are running in production.

Reliable as always, there are multiple python libraries for interacting with the Memcahed application. We settled on the first one we found, “pymemcache”.

https://pypi.org/project/pymemcache/

if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Using the structure we settled on earlier in this blog post, this is how it is reflected when using pymemcached. Using the client.set command to push the IoC’s we retrieved via the “misppull” function into the memached application.

Full script:-

When I get round to it, this will be uploaded onto our github, it is released under the MIT license.

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from pymemcache.client.base import Client
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

client = Client(('127.0.0.1', 11211)) #Location of memached application

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN API KEY HERE',
            'Accept': 'application/json',
            'Content-type': 'application/json',
             }

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-eCrimes","to_ids":"yes"}' #Setting up the data format we require from MISP

    response = requests.post('https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API
    return response


if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Next in the post series, is overing the last step… Integrating it all together using Logstash!

Part 3:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-3-logstash/

Parsing Linux DHCP logs

Introduction

So we are back at parsing DHCP logs. This time , we are taking a look at DHCP logs from Linux systems. They are quite simple and easy to parse. So while this is easy, let’s add some complication to the puzzle. Namely ECS.

ECS

Elastic Common Schema. This is a fairly new convention introduced by Elastic to help out with naming fields in your logs. The main goal is to achieve a consistent naming through your various log sources. This enables your users to know , that a IP adress is always parsed as [source][ip] and not src_ip,src_src_ipv4 and so on.

If you follow ECS , you will be able to index various kinds of firewall in the same Index and be able to visualize the data in Kibana, regardless of whether the data comes from Check Point , ASA or whatever.

The main problem with ECS is that it doesnt cover all sorts of naming yet. So you will find yourself in situation , where you simply can’t find something in ECS, that covers your particular usecase. But dont worry about it, just use what you can and be consistent.

DHCP messages

In your Linux DHCP logs, you should find lines that looks similar to this:

Apr 29 03:48:58 hostname dhcpd: DHCPACK to ip (mac) via router
Apr 29 03:48:58 hostname dhcpd: DHCPINFORM from ip via router
Apr 29 03:48:59 hostname dhcpd: DHCPDISCOVER from mac via router: unknown network segment
Apr 29 03:49:05 hsotname dhcpd: DHCPREQUEST for ip from mac (hostname) via router

So we will build a Logstash pipeline to parse this and convert it to ECS naming, whenever possible.

Logstash pipeline

We will jump right in there with some Logstash code. This pipeline will read input from Stdin, parse the logs and output the parsed to the console in a nice JSON format.

You will need Filebeat to ship your logs to Logstash and you will need to modify the pipeline to read a Beats input instead. You will also need to modify the Output section , so it will output to your desired location, whether it be Kafka or Elasticsearch directly.

input {

  stdin {
    id => "my_plugin_id"
  }
 
}

 

filter {

	if [message] =~ /dhcpd:/
	{
		grok{
			match => { 
				"message" => "%{SYSLOGBASE} %{GREEDYDATA:params}" 
					}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			grok{
				match => { 
					"params" => "%{WORD:[event][action]} %{GREEDYDATA:params2}" 
						}
			}
			if "_grokparsefailure"	 not in [tags]
			{
				grok{
					match => { 
						"params2" => 
						[
							"from %{IP:[source][ip]} via %{IPORHOST:interface}" ,
							"to %{IP:[source][ip]} \(%{MAC:[source][mac]}\) via %{IPORHOST:interface}" ,
							"from %{MAC:[source][mac]} (\(%{DATA:[host[hostname]}\) )?via %{IPORHOST:interface}(: %{GREEDYDATA:[error][message]})?" ,
							"on %{IP:[source][ip]} to %{MAC:[source][mac]} \(%{DATA:[host[hostname]}\) via %{IPORHOST:interface}" ,
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-own \(\+\/\-\)%{INT:[pool][maxown]}",
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-misbal %{INT:[pool][maxmisbal]}"
						]
							}
				}
			}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			if [source][mac]
			{
				mutate
				{
					gsub => [
					  # replace backslashes, question marks, hashes, and minuses
					  # with a dot "."
					  "[source][mac]", "[\:]", ""
					]
				}
			}
			if [source][ip]
			{
				mutate {
					   copy => { "[source][ip]" => "[related][ip]" }
				}
			}
		
		
			date {
				match => [ "timestamp",  "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
			}
			mutate {
			    remove_field => [ "message", "params2", "params", "timestamp" ]
				add_field => { "[ecs][version]" => "ecs-1.0.0" }
				add_field => { "[organization][id]" => "xx" }
				add_field => { "[observer][vendor]" => "linux" }
				 rename => { "logsource" => "[observer][host]"  }
			}
			
		}
	
		
	}
  
}  
 

output {

  stdout { codec => rubydebug }
  
  
}

Github

In order to make our lifes easier, we have posted the code in our Github repo.

Conclusion

This was a short example of parsing another log into Elastic. I highly encourage you to get yourself familiar with ECS. It will make a lot of things easier and more shareable for everyone.

Enriching ElasticSearch With Threat Data – Intro

Since my last blog post back in January, I have been seriously distracted! I promised blog posts relating to my lab but have not had the time…. But to keep you guys going until then… I am going to open source my enrichment at scale setup, combining ElasticSearch, MISP, logstash and memcache into one seriously powerful platform.

Have you ever wanted to check your entire logging estate against a threat feed? Multiple threat feeds? If so, you have probably seen that many of the big SIEM providers charge a premium for this service.

What I will demonstrate over the next few posts, is how to accomplish this for free! Well not quite for free, since you need time but you know…..

Lets talk about the diagram above… For my threat data source, I have chosen MISP. My logging sources are Squid Proxy and PiHole. These are the choices you have yourself. The rest of the setup is required to run…

Instead of choosing MISP, you could simply use a single threat data feed, Ransomware tracker could be a good place to start as they offer an open source feed via CSV, which you could quickly parse. The important thing is that you have the right data structure to put the feed into memcache. But we will go over this in further blog posts….

Across the next blog posts, I will talk about the various pieces in the puzzle and how to put them all together… The result is a very scabable, powerful enrichment engine that can ingest and enrich logs in realtime without delaying the log process.


Scaleable syslog pipeline

If you are receiving syslog data from a variety of network devices, you need a design that will allow you to receive and process syslog messages before you ingest them into your Elasticsearch cluster.

Processing syslog messages can be quite heavy in terms of CPU usage, if you are doing a lot of grok statements.

As always, this can be done in many different ways, but in this blog post I will show the basics of a Kafka based architecture.

Initial approach, that will work in many usecases: just put some kind of loadbalancer in front and use that to receive your syslog messages and ship them to some Logstash instances for processing.

This approach will be fine for a small to medium sized setup. But how will you scale this approach? Well , deploy one more Logstash server and edit your loadbalancer configuration to use the new Logstash server. But there is a smarter way.

I suggest that you have your loadbalancer forwarding to 2-3 Logstash servers. You create an extremely simple syslog pipeline. In this syslog input pipeline do absolutely nothing but forward the data to your Kafka cluster.

input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }
}

filter {
}

output{
  kafka { 
    bootstrap_servers => "kafka-01"
    topic_id =>  "raw-syslog"
  }
}

Of course since this is syslog be sure, that this pipeline is backed by a persistent queue in Logstash as syslog is send and forget.

The boxes to run this pipeline can be quite small as there will be no processing going on.

If you are running with RSyslog, you could even configure the RSyslog to send directly to Kafka and you won’t need this Logstash input pipeline.

But right now, you have just raw syslog messages living in your Kafka cluster. You need to process them. They could be ASA firewall messages, where you need to parse them.

So you create an additional Logstash pipeline, that pulls the raw syslog data from Kafka and parses it. This pipeline should be running on other boxes than the one, that received the data. Preferably some quite beefy Logstash nodes. When parsed you can send it back to Kafka, if you need or you can ingest into your Elasticsearch cluster at this point.

kafka{
	group_id => "raw-syslog-group"
	topics => ["raw-syslog"]
	bootstrap_servers => "kafka-01:<port>"
	codec => json
}

filter {
    if "%ASA-" in [message] {
      grok {
        match => [
          "message", "<%{POSINT:syslog_pri}>%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} %%{CISCOTAG:cisco_tag}: %{GREEDYDATA:cisco_message}"
        ]
      }
      syslog_pri { }
  ....
  ....
  }
}

elasticsearch {
 hosts => ["elastic-01:9200"]
 index => "syslog-%{+YYYY.MM.dd}" 
}

The trick is the group consumer feature of Kafka. In the example I specified group_id => “raw-syslog-group” . So no matter how many Logstash instances have this pipeline running, they will be working as a unit in regards to Kafka.

If you find you need more processing power , deploy an additional Logstash node and deploy this pipeline. You dont have to change your loadbalancer configuration at all.

This setup also makes your life easier , if you can centralize your Logstash processing to a few beefy Logstash nodes. Comes in handy if you are thinking of using Memcached for lookup of malicious IP’s or domain names in all your syslog messages. Hey , that sounds like a topic for a complete blog post of its own;)

Adding Windows DHCP logs to Elastic – part 2

If you followed the previous blog, we have created a pipeline for sending Windows Server DHCP logs into Elastic. But that was just the basic logs. Suppose we want to enrich the logs with the Mac Address Vendor Information. So you will be able to monitor what devices appear on your network. Please note that Mac Addresses can be manipulated by 3 party.

A Mac Address consists of 2 parts . You have the Mac Vendor Prefix of 3 bytes followed by a unique identifier of another 3 bytes.

On a Windows computer you have multiple ways to display the Mac Address. These include:

  • ipconfig /all
  • getmac
ipconfig /all
Ipconfig /all

In this example , we have a Mac Address of A0-B3-CC-85-30-F4. Our Vendor prefix will be A0-B3-CC. But how to translate this prefix into Vendor information?

Fortunately we can download a list of known Vendors http://standards-oui.ieee.org/oui.txt

So we download that list and look for our example prefix, we find this prefix as registered by HP.

Vencor a0-b3-cc
Vendor a0-b3.cc

To be able to use the downloaded OUI list in logstash we need to convert the file to a YML file. The file will also contain some characters that logstash don’t appreciate, so we will try to strip those characters also. We will use a small powershell script to do this conversion. The script contains hardcoded locations, which you need to update.

# assume we have downloaded script to this location
$file = "c:\source\oui.txt"
# target location
$dest = "c:\source\oui.yml"

# regex to filter lines that contains the "(base 16)" wording
$regex = "(^.{6})\s+\(base 16\)\s+(.*)$"

if (Test-path $dest)
{
    remove-item $dest
}

$streamWriter = [System.IO.StreamWriter] "$dest"
foreach($line in Get-Content $file) 
{
    if($line -match $regex)
   {
       $company = $matches[2]
       # strip out strange characters
       $nice = $company -replace "[^0-9a-zA-Z ]+" , ""
       # write to dest YML file
       $streamWriter.WriteLine("`"$($matches[1])`": $($nice)");
    }
}
# we are done
$streamWriter.close();

We are now able to expand our logstash filter, to use our new Mac Address vendor lookup information. We will do this with the help of the translate filter. This allows us to lookup a value in the YML file.

if [Description] == "Assign" or [Description] == "Renew"
{
   grok 
   {
    # split the mac address into 2 elements
    match => [ "MAC_Address", "(?<ouiprefix>.{6})(?<ouisuffix>.*)" ]
   }
   # lookup the ouiprefix in the YML file and add new oui field
   translate 
   {
     dictionary_path => "c:\elastic\logstash\config\oui.yml"
     field      => "[ouiprefix]"
     destination => "[oui]"
     fallback => "N/A"
   }
}

That’s all you need to do to enrich the DHCP logs with Mac Address Vendor. As usual we leave it to you to make a visualization of this info and add that to your DHCP dashboard.

Next part of this series will adress trying to find unusual hostnames in the DHCP logs. We will try to develop a pattern on the typical hostname and use Elastic Machine Learning to find uncommon patterns.

Adding Windows DHCP logs to Elastic – part 1

Prerequisites

In order to add Windows DHCP server logs to Elastic, we assume that you have the infrastructure needed.

  • Windows DHCP Server 2012 R2 or higher
  • Elasticsearch cluster
  • Logstash

We are going to work with Elastic 6.x in this setup.  

Filebeat

Install filebeat on your DHCP server in a directory of your liking.

The DHCP logs are located in %systemroot%\system32\dhcp\dhcpsrvlog*.log. You will also find ipv6 logs, but we will focus on ipv4 logs.

Add the following code to your filebeat.yml. This is the prospector, that is going to watch for your DHCP logs. Notice that I am adding a field called Type under root with the value dhcp. This is a field , we will using in the Logstash configuration.

    -
      # Paths that should be crawled and fetched. Glob based paths.
      # To fetch all ".log" files from a specific level of subdirectories
      # /var/log/*/*.log can be used.
      # For each file found under this path, a harvester is started.
      # Make sure not file is defined twice as this can lead to unexpected behaviour.
      paths:
        #- /var/log/*.log
        - c:\windows\system32\dhcp\DhcpSrvLog-*.log
      input_type: log
      include_lines: ["^[0-9]"]
      document_type: dhcp
      close_removed : false
      clean_removed : false                 
      ignore_older: 47h
      clean_inactive: 48h     
      fields:
        type: dhcp
      fields_under_root: true

And add an output section also. There are multiple ways of shipping data from Filebeat. In this example we are shipping the logs to Logstash for parsing. DNS names are logstash01 and logstash02.

output:

  ### Logstash as output
  logstash:
    # The Logstash hosts
    hosts: ["logstash01:5044" , "logstash:5044" ]

    # Number of workers per Logstash host.
    worker: 2

    # Set gzip compression level.
    #compression_level: 3

    # Optional load balance the events between the Logstash hosts
    loadbalance: true

    # Optional index name. The default index name depends on the each beat.
    # For Packetbeat, the default is set to packetbeat, for Topbeat
    # top topbeat and for Filebeat to filebeat.
    #index: filebeat

    # Optional TLS. By default is off.
    #tls:
      # List of root certificates for HTTPS server verifications
      #certificate_authorities: ["/etc/pki/root/ca.pem"]

      # Certificate for TLS client authentication
      #certificate: "/etc/pki/client/cert.pem"

      # Client Certificate Key
      #certificate_key: "/etc/pki/client/cert.key"

      # Controls whether the client verifies server certificates and host name.
      # If insecure is set to true, all server host names and certificates will be
      # accepted. In this mode TLS based connections are susceptible to
      # man-in-the-middle attacks. Use only for testing.
      #insecure: true

      # Configure cipher suites to be used for TLS connections
      #cipher_suites: []

      # Configure curve types for ECDHE based cipher suites
      #curve_types: []

After these steps, filebeat should be able to watch the DHCP server and ship them to Logstash.

Logstash

In order for logstash to process the data coming from your DHCP server , we create an input section and specify it as beats input.

input {
  beats {
   port => 5044
  }
}

Next we define the filter section, where we will parse the logs. Notice that we are using the dissect filter here. This is very convinient for this kind of stuff and you dont have to use grok for simple stuff like this.

filter
{
    if [type] == "dhcp"
    {
        dissect {
          mapping => {
          "message" => "%{ID},%{Date},%{Time},%{Description},%{IP_Address},%{Host_Name},%{MAC_Address},%{User_Name},%{TransactionID},%{QResult},%{Probationtime},%{CorrelationID},%{Dhcid},%{VendorClass_hex},%{VendorClass_ascii},%{UserClass_hex},%{UserClass_ascii},%{RelayAgentInformation},%{DnsRegError}"
                   }
           } 
         mutate
         {
            add_field => { "log_timestamp" => "%{Date}-%{Time}" }
         }
         date {
              match => [ "log_timestamp", "MM/dd/YY-HH:mm:ss" ]                
              timezone => "Europe/Copenhagen"
         }
         if "_dateparsefailure" not in [tags]
         {
             mutate
             {
                remove_field=> ['Date', 'Time', 'log_timestamp', 'message']
             }
        }
     }
}

And finally we define the output section, where we ship data from Logstash to Elastic. We are using a daily index in this example, but you could use a weekly or even monthly approach instead as there will not be a huge amount of data in this index.

output {
  if [type] == "dhcp"
  {
    elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "dchp-%{+YYYY.MM.dd}" 
    }
  }
}

Elasticsearch

In order for Elasticsearch to correctly handle our DHCP data , we need to provide a index template. Notice that we are just using Keywords here.

{
  "dhcp": {
    "order": 10,
    "index_patterns": [
      "dhcp-*"
    ],
    "settings": {},
    "mappings": {
      "dhcp": {
        "dynamic_templates": [
          {
            "strings_as_keyword": {
              "mapping": {
                "ignore_above": 1024,
                "type": "keyword"
              },
              "match_mapping_type": "string"
            }
          }
        ],
        "properties": {}
      }
    },
    "aliases": {}
  }
}

That’s is pretty much it for you to get data going. We will leave  it for you to define a dashboard to display the data in a meaningful manner.

Stay tuned for next part of this series where we will be expanding the logstash filter by enriching the data with Mac Vendor lookup.