Parsing Linux DHCP logs

Introduction

So we are back at parsing DHCP logs. This time , we are taking a look at DHCP logs from Linux systems. They are quite simple and easy to parse. So while this is easy, let’s add some complication to the puzzle. Namely ECS.

ECS

Elastic Common Schema. This is a fairly new convention introduced by Elastic to help out with naming fields in your logs. The main goal is to achieve a consistent naming through your various log sources. This enables your users to know , that a IP adress is always parsed as [source][ip] and not src_ip,src_src_ipv4 and so on.

If you follow ECS , you will be able to index various kinds of firewall in the same Index and be able to visualize the data in Kibana, regardless of whether the data comes from Check Point , ASA or whatever.

The main problem with ECS is that it doesnt cover all sorts of naming yet. So you will find yourself in situation , where you simply can’t find something in ECS, that covers your particular usecase. But dont worry about it, just use what you can and be consistent.

DHCP messages

In your Linux DHCP logs, you should find lines that looks similar to this:

Apr 29 03:48:58 hostname dhcpd: DHCPACK to ip (mac) via router
Apr 29 03:48:58 hostname dhcpd: DHCPINFORM from ip via router
Apr 29 03:48:59 hostname dhcpd: DHCPDISCOVER from mac via router: unknown network segment
Apr 29 03:49:05 hsotname dhcpd: DHCPREQUEST for ip from mac (hostname) via router

So we will build a Logstash pipeline to parse this and convert it to ECS naming, whenever possible.

Logstash pipeline

We will jump right in there with some Logstash code. This pipeline will read input from Stdin, parse the logs and output the parsed to the console in a nice JSON format.

You will need Filebeat to ship your logs to Logstash and you will need to modify the pipeline to read a Beats input instead. You will also need to modify the Output section , so it will output to your desired location, whether it be Kafka or Elasticsearch directly.

input {

  stdin {
    id => "my_plugin_id"
  }
 
}

 

filter {

	if [message] =~ /dhcpd:/
	{
		grok{
			match => { 
				"message" => "%{SYSLOGBASE} %{GREEDYDATA:params}" 
					}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			grok{
				match => { 
					"params" => "%{WORD:[event][action]} %{GREEDYDATA:params2}" 
						}
			}
			if "_grokparsefailure"	 not in [tags]
			{
				grok{
					match => { 
						"params2" => 
						[
							"from %{IP:[source][ip]} via %{IPORHOST:interface}" ,
							"to %{IP:[source][ip]} \(%{MAC:[source][mac]}\) via %{IPORHOST:interface}" ,
							"from %{MAC:[source][mac]} (\(%{DATA:[host[hostname]}\) )?via %{IPORHOST:interface}(: %{GREEDYDATA:[error][message]})?" ,
							"on %{IP:[source][ip]} to %{MAC:[source][mac]} \(%{DATA:[host[hostname]}\) via %{IPORHOST:interface}" ,
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-own \(\+\/\-\)%{INT:[pool][maxown]}",
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-misbal %{INT:[pool][maxmisbal]}"
						]
							}
				}
			}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			if [source][mac]
			{
				mutate
				{
					gsub => [
					  # replace backslashes, question marks, hashes, and minuses
					  # with a dot "."
					  "[source][mac]", "[\:]", ""
					]
				}
			}
			if [source][ip]
			{
				mutate {
					   copy => { "[source][ip]" => "[related][ip]" }
				}
			}
		
		
			date {
				match => [ "timestamp",  "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
			}
			mutate {
			    remove_field => [ "message", "params2", "params", "timestamp" ]
				add_field => { "[ecs][version]" => "ecs-1.0.0" }
				add_field => { "[organization][id]" => "xx" }
				add_field => { "[observer][vendor]" => "linux" }
				 rename => { "logsource" => "[observer][host]"  }
			}
			
		}
	
		
	}
  
}  
 

output {

  stdout { codec => rubydebug }
  
  
}

Github

In order to make our lifes easier, we have posted the code in our Github repo.

Conclusion

This was a short example of parsing another log into Elastic. I highly encourage you to get yourself familiar with ECS. It will make a lot of things easier and more shareable for everyone.

Adding Windows DHCP logs to Elastic – part 2

If you followed the previous blog, we have created a pipeline for sending Windows Server DHCP logs into Elastic. But that was just the basic logs. Suppose we want to enrich the logs with the Mac Address Vendor Information. So you will be able to monitor what devices appear on your network. Please note that Mac Addresses can be manipulated by 3 party.

A Mac Address consists of 2 parts . You have the Mac Vendor Prefix of 3 bytes followed by a unique identifier of another 3 bytes.

On a Windows computer you have multiple ways to display the Mac Address. These include:

  • ipconfig /all
  • getmac
ipconfig /all
Ipconfig /all

In this example , we have a Mac Address of A0-B3-CC-85-30-F4. Our Vendor prefix will be A0-B3-CC. But how to translate this prefix into Vendor information?

Fortunately we can download a list of known Vendors http://standards-oui.ieee.org/oui.txt

So we download that list and look for our example prefix, we find this prefix as registered by HP.

Vencor a0-b3-cc
Vendor a0-b3.cc

To be able to use the downloaded OUI list in logstash we need to convert the file to a YML file. The file will also contain some characters that logstash don’t appreciate, so we will try to strip those characters also. We will use a small powershell script to do this conversion. The script contains hardcoded locations, which you need to update.

# assume we have downloaded script to this location
$file = "c:\source\oui.txt"
# target location
$dest = "c:\source\oui.yml"

# regex to filter lines that contains the "(base 16)" wording
$regex = "(^.{6})\s+\(base 16\)\s+(.*)$"

if (Test-path $dest)
{
    remove-item $dest
}

$streamWriter = [System.IO.StreamWriter] "$dest"
foreach($line in Get-Content $file) 
{
    if($line -match $regex)
   {
       $company = $matches[2]
       # strip out strange characters
       $nice = $company -replace "[^0-9a-zA-Z ]+" , ""
       # write to dest YML file
       $streamWriter.WriteLine("`"$($matches[1])`": $($nice)");
    }
}
# we are done
$streamWriter.close();

We are now able to expand our logstash filter, to use our new Mac Address vendor lookup information. We will do this with the help of the translate filter. This allows us to lookup a value in the YML file.

if [Description] == "Assign" or [Description] == "Renew"
{
   grok 
   {
    # split the mac address into 2 elements
    match => [ "MAC_Address", "(?<ouiprefix>.{6})(?<ouisuffix>.*)" ]
   }
   # lookup the ouiprefix in the YML file and add new oui field
   translate 
   {
     dictionary_path => "c:\elastic\logstash\config\oui.yml"
     field      => "[ouiprefix]"
     destination => "[oui]"
     fallback => "N/A"
   }
}

That’s all you need to do to enrich the DHCP logs with Mac Address Vendor. As usual we leave it to you to make a visualization of this info and add that to your DHCP dashboard.

Next part of this series will adress trying to find unusual hostnames in the DHCP logs. We will try to develop a pattern on the typical hostname and use Elastic Machine Learning to find uncommon patterns.

Adding Windows DHCP logs to Elastic – part 1

Prerequisites

In order to add Windows DHCP server logs to Elastic, we assume that you have the infrastructure needed.

  • Windows DHCP Server 2012 R2 or higher
  • Elasticsearch cluster
  • Logstash

We are going to work with Elastic 6.x in this setup.  

Filebeat

Install filebeat on your DHCP server in a directory of your liking.

The DHCP logs are located in %systemroot%\system32\dhcp\dhcpsrvlog*.log. You will also find ipv6 logs, but we will focus on ipv4 logs.

Add the following code to your filebeat.yml. This is the prospector, that is going to watch for your DHCP logs. Notice that I am adding a field called Type under root with the value dhcp. This is a field , we will using in the Logstash configuration.

    -
      # Paths that should be crawled and fetched. Glob based paths.
      # To fetch all ".log" files from a specific level of subdirectories
      # /var/log/*/*.log can be used.
      # For each file found under this path, a harvester is started.
      # Make sure not file is defined twice as this can lead to unexpected behaviour.
      paths:
        #- /var/log/*.log
        - c:\windows\system32\dhcp\DhcpSrvLog-*.log
      input_type: log
      include_lines: ["^[0-9]"]
      document_type: dhcp
      close_removed : false
      clean_removed : false                 
      ignore_older: 47h
      clean_inactive: 48h     
      fields:
        type: dhcp
      fields_under_root: true

And add an output section also. There are multiple ways of shipping data from Filebeat. In this example we are shipping the logs to Logstash for parsing. DNS names are logstash01 and logstash02.

output:

  ### Logstash as output
  logstash:
    # The Logstash hosts
    hosts: ["logstash01:5044" , "logstash:5044" ]

    # Number of workers per Logstash host.
    worker: 2

    # Set gzip compression level.
    #compression_level: 3

    # Optional load balance the events between the Logstash hosts
    loadbalance: true

    # Optional index name. The default index name depends on the each beat.
    # For Packetbeat, the default is set to packetbeat, for Topbeat
    # top topbeat and for Filebeat to filebeat.
    #index: filebeat

    # Optional TLS. By default is off.
    #tls:
      # List of root certificates for HTTPS server verifications
      #certificate_authorities: ["/etc/pki/root/ca.pem"]

      # Certificate for TLS client authentication
      #certificate: "/etc/pki/client/cert.pem"

      # Client Certificate Key
      #certificate_key: "/etc/pki/client/cert.key"

      # Controls whether the client verifies server certificates and host name.
      # If insecure is set to true, all server host names and certificates will be
      # accepted. In this mode TLS based connections are susceptible to
      # man-in-the-middle attacks. Use only for testing.
      #insecure: true

      # Configure cipher suites to be used for TLS connections
      #cipher_suites: []

      # Configure curve types for ECDHE based cipher suites
      #curve_types: []

After these steps, filebeat should be able to watch the DHCP server and ship them to Logstash.

Logstash

In order for logstash to process the data coming from your DHCP server , we create an input section and specify it as beats input.

input {
  beats {
   port => 5044
  }
}

Next we define the filter section, where we will parse the logs. Notice that we are using the dissect filter here. This is very convinient for this kind of stuff and you dont have to use grok for simple stuff like this.

filter
{
    if [type] == "dhcp"
    {
        dissect {
          mapping => {
          "message" => "%{ID},%{Date},%{Time},%{Description},%{IP_Address},%{Host_Name},%{MAC_Address},%{User_Name},%{TransactionID},%{QResult},%{Probationtime},%{CorrelationID},%{Dhcid},%{VendorClass_hex},%{VendorClass_ascii},%{UserClass_hex},%{UserClass_ascii},%{RelayAgentInformation},%{DnsRegError}"
                   }
           } 
         mutate
         {
            add_field => { "log_timestamp" => "%{Date}-%{Time}" }
         }
         date {
              match => [ "log_timestamp", "MM/dd/YY-HH:mm:ss" ]                
              timezone => "Europe/Copenhagen"
         }
         if "_dateparsefailure" not in [tags]
         {
             mutate
             {
                remove_field=> ['Date', 'Time', 'log_timestamp', 'message']
             }
        }
     }
}

And finally we define the output section, where we ship data from Logstash to Elastic. We are using a daily index in this example, but you could use a weekly or even monthly approach instead as there will not be a huge amount of data in this index.

output {
  if [type] == "dhcp"
  {
    elasticsearch {
      hosts => ["http://localhost:9200"]
      index => "dchp-%{+YYYY.MM.dd}" 
    }
  }
}

Elasticsearch

In order for Elasticsearch to correctly handle our DHCP data , we need to provide a index template. Notice that we are just using Keywords here.

{
  "dhcp": {
    "order": 10,
    "index_patterns": [
      "dhcp-*"
    ],
    "settings": {},
    "mappings": {
      "dhcp": {
        "dynamic_templates": [
          {
            "strings_as_keyword": {
              "mapping": {
                "ignore_above": 1024,
                "type": "keyword"
              },
              "match_mapping_type": "string"
            }
          }
        ],
        "properties": {}
      }
    },
    "aliases": {}
  }
}

That’s is pretty much it for you to get data going. We will leave  it for you to define a dashboard to display the data in a meaningful manner.

Stay tuned for next part of this series where we will be expanding the logstash filter by enriching the data with Mac Vendor lookup.