Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

Part 2:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…

https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html

This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

memcached{
	        hosts => ["127.0.0.1:11211"]
		get => {"domain-%{destination.domain}" => "[misp_src]"}
		}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use securitydistractions.com as the value within the destination.domain field.

Logstash will append “domain-” to “securitydistractions.com”, resulting in “domain-securitydistractions.com”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
	{
		mutate
		{
			add_field=> {"[misp_src]" => "none"}	
		}
	}

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.

Leave a Reply

Your email address will not be published. Required fields are marked *