Parsing Linux DHCP logs

Introduction

So we are back at parsing DHCP logs. This time , we are taking a look at DHCP logs from Linux systems. They are quite simple and easy to parse. So while this is easy, let’s add some complication to the puzzle. Namely ECS.

ECS

Elastic Common Schema. This is a fairly new convention introduced by Elastic to help out with naming fields in your logs. The main goal is to achieve a consistent naming through your various log sources. This enables your users to know , that a IP adress is always parsed as [source][ip] and not src_ip,src_src_ipv4 and so on.

If you follow ECS , you will be able to index various kinds of firewall in the same Index and be able to visualize the data in Kibana, regardless of whether the data comes from Check Point , ASA or whatever.

The main problem with ECS is that it doesnt cover all sorts of naming yet. So you will find yourself in situation , where you simply can’t find something in ECS, that covers your particular usecase. But dont worry about it, just use what you can and be consistent.

DHCP messages

In your Linux DHCP logs, you should find lines that looks similar to this:

Apr 29 03:48:58 hostname dhcpd: DHCPACK to ip (mac) via router
Apr 29 03:48:58 hostname dhcpd: DHCPINFORM from ip via router
Apr 29 03:48:59 hostname dhcpd: DHCPDISCOVER from mac via router: unknown network segment
Apr 29 03:49:05 hsotname dhcpd: DHCPREQUEST for ip from mac (hostname) via router

So we will build a Logstash pipeline to parse this and convert it to ECS naming, whenever possible.

Logstash pipeline

We will jump right in there with some Logstash code. This pipeline will read input from Stdin, parse the logs and output the parsed to the console in a nice JSON format.

You will need Filebeat to ship your logs to Logstash and you will need to modify the pipeline to read a Beats input instead. You will also need to modify the Output section , so it will output to your desired location, whether it be Kafka or Elasticsearch directly.

input {

  stdin {
    id => "my_plugin_id"
  }
 
}

 

filter {

	if [message] =~ /dhcpd:/
	{
		grok{
			match => { 
				"message" => "%{SYSLOGBASE} %{GREEDYDATA:params}" 
					}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			grok{
				match => { 
					"params" => "%{WORD:[event][action]} %{GREEDYDATA:params2}" 
						}
			}
			if "_grokparsefailure"	 not in [tags]
			{
				grok{
					match => { 
						"params2" => 
						[
							"from %{IP:[source][ip]} via %{IPORHOST:interface}" ,
							"to %{IP:[source][ip]} \(%{MAC:[source][mac]}\) via %{IPORHOST:interface}" ,
							"from %{MAC:[source][mac]} (\(%{DATA:[host[hostname]}\) )?via %{IPORHOST:interface}(: %{GREEDYDATA:[error][message]})?" ,
							"on %{IP:[source][ip]} to %{MAC:[source][mac]} \(%{DATA:[host[hostname]}\) via %{IPORHOST:interface}" ,
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-own \(\+\/\-\)%{INT:[pool][maxown]}",
							"pool %{WORD:[pool][id]} %{DATA:[pool][subnet]}  total %{INT:[pool][total]}  free %{INT:[pool][free]}  backup %{INT:[pool][backup]}  lts %{INT:[pool][lts]}  max-misbal %{INT:[pool][maxmisbal]}"
						]
							}
				}
			}
		}
		if "_grokparsefailure"	 not in [tags]
		{
			if [source][mac]
			{
				mutate
				{
					gsub => [
					  # replace backslashes, question marks, hashes, and minuses
					  # with a dot "."
					  "[source][mac]", "[\:]", ""
					]
				}
			}
			if [source][ip]
			{
				mutate {
					   copy => { "[source][ip]" => "[related][ip]" }
				}
			}
		
		
			date {
				match => [ "timestamp",  "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
			}
			mutate {
			    remove_field => [ "message", "params2", "params", "timestamp" ]
				add_field => { "[ecs][version]" => "ecs-1.0.0" }
				add_field => { "[organization][id]" => "xx" }
				add_field => { "[observer][vendor]" => "linux" }
				 rename => { "logsource" => "[observer][host]"  }
			}
			
		}
	
		
	}
  
}  
 

output {

  stdout { codec => rubydebug }
  
  
}

Github

In order to make our lifes easier, we have posted the code in our Github repo.

Conclusion

This was a short example of parsing another log into Elastic. I highly encourage you to get yourself familiar with ECS. It will make a lot of things easier and more shareable for everyone.

Leave a Reply

Your email address will not be published. Required fields are marked *