Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

Part 2:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…


This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

	        hosts => [""]
		get => {"domain-%{destination.domain}" => "[misp_src]"}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use securitydistractions.com as the value within the destination.domain field.

Logstash will append “domain-” to “securitydistractions.com”, resulting in “domain-securitydistractions.com”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
			add_field=> {"[misp_src]" => "none"}	

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.

Enriching ElasticSearch With Threat Data – Part 2 – Memcached and Python

In our previous post we covered MISP and some of the preparation work needed to integrate MISP and ElasticSearch. With MISP now setup and prepped, we can now focus on Python and Memcached.

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

This image has an empty alt attribute; its file name is image-1024x547.png


First a little background into why we chose to use Memcached for our ElasticSearch integration…..

Threat data feeds are dynamic by nature, they are being constantly updated and multipe times a day. The updates contain both additions to the feeds and deletions. This means our enrichment engine would need to be dynamic too…. To explain this better, we will use Ransomware Tracker as an example..

Lets say a new IP is published to the Ransomware Tracker feed, this would be easy to manage in an enrichment engine, as we could simply add the new IP to our list. But what if an IP is removed from Ransomware Tracker, now we have to monitor the Ransomware Tracker feed to find the deletion, then check our own list to see if we have this IP and then delete it from our list. This can very quickly get complex to handle…

Another way to handle it could be to monitor the Ransomware Tracker feed for changes, when a change is made then clear our list completely and pull the latest feed instead….. This would solve part of the problem, but it can result in having a small period where the enrichment engine is empty, it also increases complexity as we would have to delete the list each time, which is definitely not what we wanted!

We decided to look into a way of simply assigning a TTL to each IoC on the feed, and then age out the IoC’s which are no longer present on the feed. We would setup our script to pull the feed at a given time interval, then push this into our enrichment engine store. Simple yet incredibly effective… This method also had to be supported by ElasticSearch, and how lucky we were that Logstash has a filter plugin for memcached. So it was this we settled on using to store the feed data for enrichment.

Memached – Preparation

Memcached meets our requirements for being simple, and handling aging of IoC’s, it is also supported by ElasticSearch/Logstash which makes it perfect for this task. It also comes with the huge additional benefit of storing the data in memory, so lookups from Logstash to the data will be ultra fast.


The Memcached application is a very simple key-value store running in memory, you can telnet into the application running by default on port 11211.

The application is made up of only a few commands. The ones we are in need of here, are the “get” and “set” commands. Both of which are quite self explanatory….

The set command will be used by our Python script, to set the data into the store.

The get command will be used by the Logstash filter plugin, to query the store for a specific IoC and return the result back to Logstash.

The only thing we need to do, is set the structure of the data within the key-value store. Since we are going to be working with multiple data types, domain names, IP addresses etc. We will make our key a combination of the datatype and the IoC. So in the example that securitydistractions.com is on the RansomwareTracker feed, it will be represented as: “domain-securitydistractions.com”.

Using the key as the combination of the data type and the IoC will be easier to understand later when we look at the Logstash configuration.

The value will be the feed name, so in this example “Feed-RansomwareTracker”.

The TTL can be set to whatever suits your organisation, in our example we will use 70 seconds. This is because we are going to run our Python script for pulling the feed from MISP every 30 seconds, this would then allow us to miss 1 pull and not age out all IoC’s within the memcached store.

So the set command for memcached with our example data will be as follows:- “domain-securitydistractions.com”, “Feed-RansomwareTracker”, “70”.

It is highly recommended that you run Memcached on the same machine as logstash, for latency purposes. In our lab we are running everything on a Debian VM. There are Debian packages available for Memcached…..

Python – Memcache/MISP integration

Caveat: I am not a developer, and my programming skills are limited… The script here only had to be very simple, so it suited by skill level. There will be multiple ways to improve it in the future… But this is what we are running with here, and it works!

As ever, any form of integration between tools is probably going to require some form of scripting. In our case we knew we needed a script that would handle the pulling of the data from our MISP platform API, and then pushing this data into Memached. The full script can be found at the bottom of the page….

The first part is our interaction with the MISP API….

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN MISP API KEY',
            'Accept': 'application/json',
            'Content-type': 'application/json',

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-RansomwareTracker","to_ids":"yes"}'

    response = requests.post('https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API

    return response

Remember to change the “Authorization” section within the header to your own API key.

The data variable, is used to tell the MISP API which IoC’s we want to retrieve, in this example we are asking for all domain names that are tagged with the “Feed-RansomwareTracker” and where the “to_ids” setting is set to yes. This will be returned as plaintext…

Remember also to change the URL within the response variable to reflect the domain name or IP address of your own MISP instance. I have also disabled verification of SSL as it is done within my lab. It is not recommended to keep this setting if you are running in production.

Reliable as always, there are multiple python libraries for interacting with the Memcahed application. We settled on the first one we found, “pymemcache”.


if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Using the structure we settled on earlier in this blog post, this is how it is reflected when using pymemcached. Using the client.set command to push the IoC’s we retrieved via the “misppull” function into the memached application.

Full script:-

When I get round to it, this will be uploaded onto our github, it is released under the MIT license.

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from pymemcache.client.base import Client

client = Client(('', 11211)) #Location of memached application

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN API KEY HERE',
            'Accept': 'application/json',
            'Content-type': 'application/json',

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-eCrimes","to_ids":"yes"}' #Setting up the data format we require from MISP

    response = requests.post('https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API
    return response

if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Next in the post series, is overing the last step… Integrating it all together using Logstash!

Part 3:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-3-logstash/

Enriching ElasticSearch With Threat Data – Part 1 – MISP

This image has an empty alt attribute; its file name is image-1024x547.png

There is a lot of great blog posts and reads available on the MISP platform, so I don’t want to do it injustice by writing a huge intro here… I have a plan to write a more in depth blog post about MISP in the distant future, but before then please go on over to the MISP project site:

What we are interested in for our enrichment, is how to leverage MISP to produce our own threat data feeds.

MISP allows you to create your own events made up of IoC’s and then leverage these as a threat data feed.

MISP out of the box also has support for many open source threat feeds and it can aggregate these and display them in a chosen standard. This can really help with centralizing your organisations threat data.
So you can combine OSINT and your own intelligence for enrichment into ElasticSearch.

We will begin our example by working with the Ransomware Tracker CSV feed which can be enabled in MISP. This feed is well known by the community and will give a good understanding of how the integration works.


To get started you can download a training MISP instance here (or use your own MISP instance):- https://www.circl.lu/misp-images/latest/

Once you have your instance running and can access the WebUI, you should navigate to “Sync Actions” and then down to “List Feeds”

This will present you with a screen showing all of the default available threat data feeds and their sources.

If you scroll through this list, eventually you will find Ransomware Tracker.

You will need to check the tick box next to the feed, and then scroll to the top and select “Enable selected”.

One the feed is enabled, you will need to return to the Ransomware Tracker section and all the way at the right handside there is a button with the tooltip “fetch all events”

This will then begin the job to fetch the entire Ransomware Tracker feed into a MISP event. To find the event highlight the “Event Actions” button and then click on the “List Events” option.

This will take you to your MISP instance’s event section. Yours will look slightly different to mine, if you are using MISP already then it will be populated with events you have been working with or synced with. If not and you are new to this, then it should be populated with only 1 event… With the Info set to “Ransomware Tracker CSV Feed”.

When you drill down into the event, you will find some information relating to the threat feed, including an item in red “Published : “No”. This means that the event is currently stored inside MISP, but is not available for distribution, via the API or a sharing method. This allows us to work on the event without fear of publishing something by accident.

You can scroll through the event and see all of the IoC’s contained within the Ransomware Tracker feed, but what we are interested in now is tagging the Ransomware Tracker feed so we can export it via the API as one feed.

To do this, we will need to create a new custom tag within MISP….

Hover over the “Event Actions” button and then click on “Add Tag”.

You will then be presented with the Add Tag section, here you can give your new tag a name. For this example we will name it “Feed-RansomwareTracker”, choose the colour the tag will have in your event view, and then ensure “Exportable” is checked. Then click “Add”.

You can then go back to your Ransomware Tracker CSV event….

This image has an empty alt attribute; its file name is image-7-1024x489.png

As part of the event info, you can see a section called “Tags” with a + button next to it. Click on the + button, and then add your newly created Feed-RansomwareTracker tag to the event.

The last step is to then publish the event, so it can be retrieved via the API for pushing into ElasticSearch!

On the left hand side next to the event info, you can find the option for “Publish Event”. Click on this and then click “Yes” when prompted to publish the event.

This has now published the event and the tags and it is ready to be retrieved by the API.


Alongside the amazing WebUI for MISP, there is an incredibly strong API engine running underneath. Again I won’t focus too much here on singing it’s praises, this I will save for a later post!

But in this example, we will use the MISP API to pull out the tagged Ransomware Tracker feed for use within ElasticSearch.

To prepare the API for our scripts, all we need to do is find the automation key…

Hover over the “Event Actions” button within the MISP WebUI… And click on the “Automation” button.

Within the Automation section you can find your automation key:-

Save this key, you will need it later for your Python script!

This concludes our preparation work within MISP, next up…. Python and Memcached….

Part 2:-https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

Enriching ElasticSearch With Threat Data – Intro

Since my last blog post back in January, I have been seriously distracted! I promised blog posts relating to my lab but have not had the time…. But to keep you guys going until then… I am going to open source my enrichment at scale setup, combining ElasticSearch, MISP, logstash and memcache into one seriously powerful platform.

Have you ever wanted to check your entire logging estate against a threat feed? Multiple threat feeds? If so, you have probably seen that many of the big SIEM providers charge a premium for this service.

What I will demonstrate over the next few posts, is how to accomplish this for free! Well not quite for free, since you need time but you know…..

Lets talk about the diagram above… For my threat data source, I have chosen MISP. My logging sources are Squid Proxy and PiHole. These are the choices you have yourself. The rest of the setup is required to run…

Instead of choosing MISP, you could simply use a single threat data feed, Ransomware tracker could be a good place to start as they offer an open source feed via CSV, which you could quickly parse. The important thing is that you have the right data structure to put the feed into memcache. But we will go over this in further blog posts….

Across the next blog posts, I will talk about the various pieces in the puzzle and how to put them all together… The result is a very scabable, powerful enrichment engine that can ingest and enrich logs in realtime without delaying the log process.

Building Your Own Blue Team Lab


Every blue team member should spend some time investing in building their own lab setup. It can be a great and fun learning experience, where you pickup some “low hanging fruit” skills. Hopefully comíng out with something you will continue to use and develop over the rest of your InfoSec career.

Having your own lab, allows you to quickly test theories and detection methods. It can be adapted to support any usecase you need, your only limit is your own imagination….

How does it look?

The lab is designed to work as follows…. You add an IoC to an event within MISP, this is then distributed to the black list’s on your prevention tools. At the same time, this IoC is used to alert based on logs coming into ElasticSearch. The IoC will be added to an ElastAlert rule, which then takes care of searching back through the ElasticSearch logs for previous activity. ElastAlert needs somewhere to send its alerts to and this is where The Hive comes into play…. Sounds simple right?

Open source is the only way…

The Security Distractions lab is only based on Open Source tools, so your only investment if you decide to build this, will be your own time! It can be used for production with a few modifications…

Over the next few blog posts, we will go into each tool and their integration points. We promise to try to keep it exciting!

But how will I run the lab?

This lab can be built using whatever method you want… We will supply the configuration files for each tool where needed, but it is up to you how it is run. I like to run using VM’s but others are obssessed with Docker. So it is about using whatever you feel most comfortable with. For those planning on using VM’s, the first post will be about VirtualBox, so you can get started…. If you’re using Docker, then ummm…… You’re on your own!

All configuration files will be found over on our GitHub page:- https://github.com/securitydistractions