Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

Part 2:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…

https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html

This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

memcached{
	        hosts => ["127.0.0.1:11211"]
		get => {"domain-%{destination.domain}" => "[misp_src]"}
		}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use securitydistractions.com as the value within the destination.domain field.

Logstash will append “domain-” to “securitydistractions.com”, resulting in “domain-securitydistractions.com”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
	{
		mutate
		{
			add_field=> {"[misp_src]" => "none"}	
		}
	}

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.

2 thoughts on “Enriching ElasticSearch With Threat Data – Part 3 – Logstash

  1. Hi David,

    I hope this finds you well and that this is your email.

    Introductions, I’m Dharshan Ryan, a Security consultant and the Threat intelligence Head here at Vigilant Asia.

    First of all I would like to say awesome set of posts regarding the topic above. It has given me an answer to a solution I have been looking for these past couple of days.

    I have a few questions regarding the python script which queries from misp API

    This line

    data = ‘{“returnFormat”:”text”,”type”:”domain”,”tags”:”Feed-eCrimes”,”to_ids”:”yes”}’ #Setting up the data format we require from MISP

    is it possible for ”type” to be an array, for example: {“type”: [“taga” ,”tagb”,”…”]}, considering this is calling the MISP API, I think its possible.

    But the problem I see is this
    domains = (response.text).splitlines()
    for domain in domains:
    client.set(“domain-” + domain, “Feed-RansomwareTracker”, 70)
    Is it possible for this to handle multiple types?

    Based on this domains = (response.text).splitlines()
    Im really not sure.

    Hoping for your reply.

    P.S. I have 0 experience in coding out python, but some understanding.

  2. Hi David,
    I came across your posts about Misp and Elk and i’m following with anticipation. i’ve been a long time user of ELK and recently come back to the game and been looking into importing misp type data into an elk setup, and im experimenting with docker containers and some novel images i found online that do similar to yours so im wondering how they would compare in operation and substance.

    MarkG

Leave a Reply

Your email address will not be published. Required fields are marked *