Watching for no data


So you are sending stuff to your Elasticsearch cluster with some beat, eg. filebeat. But as everyone knows , things go wrong , stuff breaks. But you are trying to be proactive and watch for stuff breaking. So why not let Elasticsearch monitor for missing stuff with a watcher. You go in search for some examples and pretty sure, you will end up at this repo:

The examples repo

This repo is used for providing examples of how to do various stuff with your shining Elasticsearch setup. And if you look in the alerter category , you will find a recipe called system fails to provide data. Oh yeah…

Looks pretty useful. Basically you are setting up a watcher to search an index for hosts seen in the last 24 hour and to search for hosts seen in the last 1 hour. However, there is a catch, the sample doesnt provide any example of how to do the delta. You just end up with 2 lists, that you have little use for 😉

The revised sample

Every change , I get , when I to talk my friends at Elastic, I tell them, the watcher is too hard to use. Make it simpler, please. And they smile and say, “we know” 🙂

So back to the problem.

You have to do some very funky looking painless scripting to find the delta of those lists, we started out with. You do this by the means of a transform.

This is how the transform sections looks in the repo. It is bascially empty, so there will be no transform going on.

  "actions": {
    "log": {
      "transform": {
      "script": {
      "logging": {
        "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"

So this is my attempt to fix this problem. Dont get scared, it is not as bad as it looks. Just add it to the watcher.

  "transform": {
    "script": {
      "source": "def last_period = -> p.key ).collect(Collectors.toList());def history = -> e.key ).filter(p -> !last_period.contains(p)).map(p -> [ 'hostname':   p]).collect(Collectors.toList());return  history;",
      "lang": "painless"

The source code laid out in a more readable format. Multiline painless scripts in the watcher UI , please , Elastic 😀

def last_period = ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.
    map(p -> p.key ).

def history = ctx.payload.aggregations.periods.buckets.history.hosts.buckets.
    map(e -> e.key ).
      filter(p -> !last_period.contains(p)).
        map(p -> [ 'hostname':   p]).

return  history;

That code will make a nice list of hosts that hasn’t delivered data in the last period.

To use the list in the action section, you do something like this. Notice the condition in there as well , to prevent the watcher going off and sending emails, when everything is working:

  "actions": {
    "log": {
      "condition": {
        "compare": {
          "ctx.payload._value.0": {
            "not_eq": null
      "email": {
        "profile": "standard",
        "to": [
        "subject": "oh no , data missing",
        "body": {
          "html": "<h1>Systems not delivering data in the last {{ctx.metadata.last_period}} perid</h1>  <ul> {{#ctx.payload._value}}<li>{{hostname}}</li>{{/ctx.payload._value}}</ul>"


As usual, there are more ways to achieve the same thing. You could probably do a extremely complex search also. But if you add these 2 sections to your watcher , you are good to go.

TheHive enrichment


An increasing number of SOC’s/IRT-teams, etc. are beginning to use The Hive and ElasticSearch.

While researching these tools I saw a lot of talk about enrichment, and tying various tools together, so I wanted to provide my take on it as well.

I am by no means an expert in any of these tools, or in the IRT process, but I have had the priviledge of getting to know a few people that I would consider experts (even though they might not themselves feel that way), and while watching them work, I started thinking that some of the tasks they routinely perform could be eligible for automation.

Specifically I saw that a lot of the time when they where doing triage or incident response, they would receive an alert (this could be from their EDR tool, tier1 SOC, IDS/IPS, etc), where they would only get provided with an ip-address, and a timestamp.

Because most corporate infrastructures are configured with DHCP they would often have to go look at their ElasticSearch logs, to determine which endpoint (hostname) was assigned with the given IP-Address at the given time.

While this is somewhat trivial to do, it is also a well defined, recurring task, which meant that (if possible) i wanted to see if I could automate it.

Integrating TheHive and ElasticSearch

As you may or may not know The Hive uses an underlying enrichment engine called Cortex.

In short, Cortex works by leveraging analyzers (used for collecting information related to an observable, for instance collecting information from VirusTotal in relation to a checksum) and responders (used to act on information, for instance pushing an ip to a blacklist, or sending an email out).

With this in mind I figured that the way to go, would be to create an analyzer that would be able to query ElasticSearch, and return the hostname that was using the given IP-Address at the specified time.

I figured that the way to do this would be to create the event in TheHive, and attach the given IP-Address as an observable, from which the analyzer could be run.

This however turned out to be somewhat of a dead end for me as analyzers have the caveat of only working on observables, which meant that the only way I was able to provide a timestamp to the analyzer was to manually type it into the messageField of the observable (which I briefly considered but ended up deciding would be way to error-prone in a production environment, as the timestamp would have to adhere to specific formatting rules).

Because of this caveat I started looking at the possibilities if I were to implement this as a responder instead (even though this is not how responders are supposed to be used).

I quickly realized that because responders can be invoked on event, alerts and observables, a responder has acces to a wide range of information related to the event, even if it is implemented to only work with observables.

With this in mind I was able to implement functinal timestamps, using customFields with datatype datetime:

So this meant that I was able to implement A functional responder, which was able to query elasticsearch (through the standard rets-API), and return a report containing all relevant entries, corresponding to the query.

I, however was not entirely satisfied by this, as I felt like this could only be considered as somewhat automation, since I would still have to read through the returned report, and manually input the results as new observables.

Completing the automation

Using cortex, I felt quite limited in what I could do with my results, so I started contemplating how to take my attempted automation a step further, and therefore I started looking into the rest-API for TheHive.

This gave me all the possibilities I wanted, and with this in mind, I was able to leverage another customField called autoEnrichment (with datatype boolean) to be able to define whether I wanted the responder to automatically create new observable(s) from the ElasticSearch results.

The actual code

Analyzers and responders usually consist of the following:

  • A requirements file (which defines which non-standard libraries is needed for the analyzer/responder to work)
  • a json file (defining the prerequisites for the responder/analyzer, such as which datatype it can work with)
  • the analyzer/responder itself (the actual code, that performs the required operations)

I, however choose to split the actual analyzer/responder file into 3 seperate files (,, and

The idea behind this is to seperate the initialization, configurable items, and functionality, in an attempt to make the responder easier to maintain, and easier to build upon, in case a need for a similar responder which can handle other types of logs, should arise.

In keeping with the spirit of maintainability (and best practice) I have also tried to document the code with comments, explaining the functionality, and thoughts behind each code-section, and as such most of the code should be somewhat self-explanatory…

So without further ado, Here is a link to the github repo with the code:

Fault tolerant Elasticsearch


By default your Elasticsearch cluster is pretty robust. Typically you would go for a design with one primary shard and one replica shard. You could have multiple datacenters with low network latency and have the cluster operating in both centers at once. You could also have 2 racks with nodes.

But what happens if you loose one datacenter or one rack? Your cluster will likely go RED if you dont plan for it upfront.

Shard allocation awareness

There are multiple ways to design around a disaster. But one thing you surely need to be aware of is a feature called Shard Allocation Awareness.

You can read the documentation from Elastic here.

Basically this feature enables your Elastic cluster to know about your physical topology. This enables Elastic to be smart enough to put your primary shard and replica shards into 2 different zones. Zones can be a datacenter or a rack as mentioned before.

You tell Elastic this by adding node attributes to your config file. In this example we will add a node attribute called datacenter to our elasticsearch.yml file. It will have 2 possible values : dc1,dc2

node.attr.datacenter: dc1

Once you have added this attribute to all your nodes, you need to perform a rolling cluster restart for the attribute value to be read.

Afterwards you need to enable the feature.

put _cluster/settings
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"

Shortly thereafter you will notice some shard activity going on in the cluster when the master will arrange your shards according to your topology. When the dust settles , you can rest assured, that your indices are present in both datacenters.

Forced shard allocation awareness

However this all sounds good, but there is a problem. Suppose you loose a datacenter (dc1) now. The cluster will do its best to recover. So it will begin making all replica shards in DC2 into primary and then will start to create new replica also in DC2. This means , that you need to have double up on diskspace in either center.

If you dont have the luxury of having double up on diskspace everywhere, then you should be aware of forced shard allocation awareness.

Here you enable this , notice that you now specify the possible values of the datacenter attribute.

put _cluster/settings
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"

When you do this, Elastic knows, that you intend to have your indices available on nodes tagged with these values. So when you loose all nodes in DC1, Elastic is not going to try recover everything into DC2. When this happens , you will see cluster go yellow with 50% of your shards missing. But cluster will be available and operate as before. When DC1 becomes available again , Elastic will start to recover as normal.

Additional benefits

This feature will do more for you than just help out in case of disaster. This feature can also help you when you need to a rolling cluster restart, rolling cluster upgrade or simple base OS patching.

Normally when you do a rolling upgrade, you need to do this node by node. This is cumbersome and takes time. With forced shard allocation awareness, you can take eg. 50% of your warm nodes out of service, patch them or change config and bring them back online. So you should have much faster maintenance on your cluster.


This setup is not for everyone. If you are really paranoid and have enough resources, you could also make your clusters available multiple places and use CCR as your recovery plan. Examine your options and choose what fits you best.

Index Lifecycle Management – ILM


When you are operating Elasticsearch clusters, you will probably need some form of managing the lifetime of your indices. You could also need a way of handling migration from hot to warm nodes. Previously this was typically handled by a separate tool from Elastic called curator.

We have been running with Curator handling our indices for a long time, but we feel handling indices should be builtin functionality. We have had some scripts running in 3 steps as a cron job:

  • rollover – runs every 15 min
  • migration – runs every hour
  • deletion – runs once per day

The configuration files for Curator could prove to a little bit of a pain.

But when Elastic released 6.7.0 – their new Index Lifecycle Management was now a builtin feature making our daily administation simpler. No longer do you need an external cron job running curator. All ILM tasks can be done from within Kibana.

Migration from curator

So this is all good. But what do you actually need to do in order to start migration away from curator and instead start using ILM.

You dont have to migrate every thing in a big bang approach, just move 1 thing at a time. Most of our big indices are using rollover approach, so that will be the focus here.

We came up with these steps in order to do the migration for a specific group of indices.

  • Create ILM Policies
    • Create one for rollover and one for already rolled over indices
  • Apply policy for already rolled over indices
  • Attach rollover policy to template
  • Perform rollover
  • Remove curator support for current group of indices

Create ILM Policies

It’s straightforward to create a policy, just fire up Kibana, Navigate to Management and Index LifeCycle Policies and Create Policy.

So in this example we create 2 policies

  • dns-standard
  • dns-standard-rollover

We create a set of 2 policies per group of indices. If everything you have are using the same periods for hot/warm/deletion, you only need 2 policies for everything. But in order to allow for customization per group of indices, we create policies per group.

So the dns-standard policy is used for existing indices. So it will not contain a rollover phase.

Edit the dns-standard policy again and click the save as new at the top, enable the rollover phase and save it as dns-standard-rollover.

Now please verify the settings in the ILM policies matches your configuration from your old curator jobs 😉

Apply policy

In order to apply a policy to a set of indices, you need to goto Kibana Developer tools.

Once you run that , your current DNS indices will be handled by ILM.

Attach rollover policy

In order to attach the rollover policy to a template click actions for the rollover policy. You will be asked to select a template name and you need to enter the write alias for the rollover index.

Perform rollover

Next step is to do a manual rollover of your DNS alias. You will do that from Kibana Developer toools.

Now all DNS indices will be handled by ILM

Remove curator support

Goto your curator server and remove all references to the current set of indices.

Verify your work

Once you have completed the steps , you are able to see in ILM, that everything is working. You can see how many indices are handled by each policy. If you monitor this on a daily basis you will eventually see , when the dns-standard policy is no longer needed (linked-policies=0). This will happen according to your retention settings.


Please try out ILM in your test environment first to get familiar with the steps. Or learn the hard way 😉

The latest version of Curator is ILM aware. So it will not touch indices, that are marked by ILM.

One thing to be aware of , is that ILM doesn’t offer all advanced features of Curator. However it delivers, the needed functionality for most basic usecases.

We encourage you to try it out.

Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:-

Part 2:-

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…

This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

	        hosts => [""]
		get => {"domain-%{destination.domain}" => "[misp_src]"}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use as the value within the destination.domain field.

Logstash will append “domain-” to “”, resulting in “”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
			add_field=> {"[misp_src]" => "none"}	

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.

Enriching ElasticSearch With Threat Data – Part 2 – Memcached and Python

In our previous post we covered MISP and some of the preparation work needed to integrate MISP and ElasticSearch. With MISP now setup and prepped, we can now focus on Python and Memcached.

Part 1:-

This image has an empty alt attribute; its file name is image-1024x547.png


First a little background into why we chose to use Memcached for our ElasticSearch integration…..

Threat data feeds are dynamic by nature, they are being constantly updated and multipe times a day. The updates contain both additions to the feeds and deletions. This means our enrichment engine would need to be dynamic too…. To explain this better, we will use Ransomware Tracker as an example..

Lets say a new IP is published to the Ransomware Tracker feed, this would be easy to manage in an enrichment engine, as we could simply add the new IP to our list. But what if an IP is removed from Ransomware Tracker, now we have to monitor the Ransomware Tracker feed to find the deletion, then check our own list to see if we have this IP and then delete it from our list. This can very quickly get complex to handle…

Another way to handle it could be to monitor the Ransomware Tracker feed for changes, when a change is made then clear our list completely and pull the latest feed instead….. This would solve part of the problem, but it can result in having a small period where the enrichment engine is empty, it also increases complexity as we would have to delete the list each time, which is definitely not what we wanted!

We decided to look into a way of simply assigning a TTL to each IoC on the feed, and then age out the IoC’s which are no longer present on the feed. We would setup our script to pull the feed at a given time interval, then push this into our enrichment engine store. Simple yet incredibly effective… This method also had to be supported by ElasticSearch, and how lucky we were that Logstash has a filter plugin for memcached. So it was this we settled on using to store the feed data for enrichment.

Memached – Preparation

Memcached meets our requirements for being simple, and handling aging of IoC’s, it is also supported by ElasticSearch/Logstash which makes it perfect for this task. It also comes with the huge additional benefit of storing the data in memory, so lookups from Logstash to the data will be ultra fast.

The Memcached application is a very simple key-value store running in memory, you can telnet into the application running by default on port 11211.

The application is made up of only a few commands. The ones we are in need of here, are the “get” and “set” commands. Both of which are quite self explanatory….

The set command will be used by our Python script, to set the data into the store.

The get command will be used by the Logstash filter plugin, to query the store for a specific IoC and return the result back to Logstash.

The only thing we need to do, is set the structure of the data within the key-value store. Since we are going to be working with multiple data types, domain names, IP addresses etc. We will make our key a combination of the datatype and the IoC. So in the example that is on the RansomwareTracker feed, it will be represented as: “”.

Using the key as the combination of the data type and the IoC will be easier to understand later when we look at the Logstash configuration.

The value will be the feed name, so in this example “Feed-RansomwareTracker”.

The TTL can be set to whatever suits your organisation, in our example we will use 70 seconds. This is because we are going to run our Python script for pulling the feed from MISP every 30 seconds, this would then allow us to miss 1 pull and not age out all IoC’s within the memcached store.

So the set command for memcached with our example data will be as follows:- “”, “Feed-RansomwareTracker”, “70”.

It is highly recommended that you run Memcached on the same machine as logstash, for latency purposes. In our lab we are running everything on a Debian VM. There are Debian packages available for Memcached…..

Python – Memcache/MISP integration

Caveat: I am not a developer, and my programming skills are limited… The script here only had to be very simple, so it suited by skill level. There will be multiple ways to improve it in the future… But this is what we are running with here, and it works!

As ever, any form of integration between tools is probably going to require some form of scripting. In our case we knew we needed a script that would handle the pulling of the data from our MISP platform API, and then pushing this data into Memached. The full script can be found at the bottom of the page….

The first part is our interaction with the MISP API….

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN MISP API KEY',
            'Accept': 'application/json',
            'Content-type': 'application/json',

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-RansomwareTracker","to_ids":"yes"}'

    response ='https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API

    return response

Remember to change the “Authorization” section within the header to your own API key.

The data variable, is used to tell the MISP API which IoC’s we want to retrieve, in this example we are asking for all domain names that are tagged with the “Feed-RansomwareTracker” and where the “to_ids” setting is set to yes. This will be returned as plaintext…

Remember also to change the URL within the response variable to reflect the domain name or IP address of your own MISP instance. I have also disabled verification of SSL as it is done within my lab. It is not recommended to keep this setting if you are running in production.

Reliable as always, there are multiple python libraries for interacting with the Memcahed application. We settled on the first one we found, “pymemcache”.

if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Using the structure we settled on earlier in this blog post, this is how it is reflected when using pymemcached. Using the client.set command to push the IoC’s we retrieved via the “misppull” function into the memached application.

Full script:-

When I get round to it, this will be uploaded onto our github, it is released under the MIT license.

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from pymemcache.client.base import Client

client = Client(('', 11211)) #Location of memached application

def misppull():
    headers = {
            'Authorization': 'INSERT YOUR OWN API KEY HERE',
            'Accept': 'application/json',
            'Content-type': 'application/json',

    data = '{"returnFormat":"text","type":"domain","tags":"Feed-eCrimes","to_ids":"yes"}' #Setting up the data format we require from MISP

    response ='https://*INSERTYOUROWNMISPHERE*/attributes/restSearch', headers=headers, data=data, verify=False) #Call to MISP API
    return response

if __name__ == '__main__':
    response = misppull()
    domains = (response.text).splitlines()
    for domain in domains:
               client.set("domain-" + domain, "Feed-RansomwareTracker", 70)

Next in the post series, is overing the last step… Integrating it all together using Logstash!

Part 3:-

Enriching ElasticSearch With Threat Data – Part 1 – MISP

This image has an empty alt attribute; its file name is image-1024x547.png

There is a lot of great blog posts and reads available on the MISP platform, so I don’t want to do it injustice by writing a huge intro here… I have a plan to write a more in depth blog post about MISP in the distant future, but before then please go on over to the MISP project site:

What we are interested in for our enrichment, is how to leverage MISP to produce our own threat data feeds.

MISP allows you to create your own events made up of IoC’s and then leverage these as a threat data feed.

MISP out of the box also has support for many open source threat feeds and it can aggregate these and display them in a chosen standard. This can really help with centralizing your organisations threat data.
So you can combine OSINT and your own intelligence for enrichment into ElasticSearch.

We will begin our example by working with the Ransomware Tracker CSV feed which can be enabled in MISP. This feed is well known by the community and will give a good understanding of how the integration works.


To get started you can download a training MISP instance here (or use your own MISP instance):-

Once you have your instance running and can access the WebUI, you should navigate to “Sync Actions” and then down to “List Feeds”

This will present you with a screen showing all of the default available threat data feeds and their sources.

If you scroll through this list, eventually you will find Ransomware Tracker.

You will need to check the tick box next to the feed, and then scroll to the top and select “Enable selected”.

One the feed is enabled, you will need to return to the Ransomware Tracker section and all the way at the right handside there is a button with the tooltip “fetch all events”

This will then begin the job to fetch the entire Ransomware Tracker feed into a MISP event. To find the event highlight the “Event Actions” button and then click on the “List Events” option.

This will take you to your MISP instance’s event section. Yours will look slightly different to mine, if you are using MISP already then it will be populated with events you have been working with or synced with. If not and you are new to this, then it should be populated with only 1 event… With the Info set to “Ransomware Tracker CSV Feed”.

When you drill down into the event, you will find some information relating to the threat feed, including an item in red “Published : “No”. This means that the event is currently stored inside MISP, but is not available for distribution, via the API or a sharing method. This allows us to work on the event without fear of publishing something by accident.

You can scroll through the event and see all of the IoC’s contained within the Ransomware Tracker feed, but what we are interested in now is tagging the Ransomware Tracker feed so we can export it via the API as one feed.

To do this, we will need to create a new custom tag within MISP….

Hover over the “Event Actions” button and then click on “Add Tag”.

You will then be presented with the Add Tag section, here you can give your new tag a name. For this example we will name it “Feed-RansomwareTracker”, choose the colour the tag will have in your event view, and then ensure “Exportable” is checked. Then click “Add”.

You can then go back to your Ransomware Tracker CSV event….

This image has an empty alt attribute; its file name is image-7-1024x489.png

As part of the event info, you can see a section called “Tags” with a + button next to it. Click on the + button, and then add your newly created Feed-RansomwareTracker tag to the event.

The last step is to then publish the event, so it can be retrieved via the API for pushing into ElasticSearch!

On the left hand side next to the event info, you can find the option for “Publish Event”. Click on this and then click “Yes” when prompted to publish the event.

This has now published the event and the tags and it is ready to be retrieved by the API.


Alongside the amazing WebUI for MISP, there is an incredibly strong API engine running underneath. Again I won’t focus too much here on singing it’s praises, this I will save for a later post!

But in this example, we will use the MISP API to pull out the tagged Ransomware Tracker feed for use within ElasticSearch.

To prepare the API for our scripts, all we need to do is find the automation key…

Hover over the “Event Actions” button within the MISP WebUI… And click on the “Automation” button.

Within the Automation section you can find your automation key:-

Save this key, you will need it later for your Python script!

This concludes our preparation work within MISP, next up…. Python and Memcached….

Part 2:-

Simple Kafka and Elasticsearch integration

Kafka is a awesome platform for moving data around. It is often used together with an Elasticsearch cluster in order to host data before data gets ingested into Elasticsearch.

Kafka deals with topics to carry some specific data around. Imagive having topics for dns,dhcp, firewall and so. You can quickly end up with a high number of topics, right?

So in this blog post , I will present a way for you to utilize a single Kafka topic to carry many kinds of data, while still being able to ingest into different Elasticsearch indices. It also enables you to specify the rotation of the indices: rollover,weekly,daily or what your needs may be.

The approach works by creating a few simple properties alongside your data:

  • myapp
  • myrotation

Lets use this scenario. You have some kind of logfile, that contains log data for your app “blog-hits”. Your app is low volume in terms of log and you just need a weekly index.

You install filebeat and add these entries to your filebeat configuration.

    myapp: blog-hits
    myrotation: weekly
  fields_under_root: true

You would then configure Filebeat to send this to Logstash for further parsing. After parsing Logstash sends to Kafka on a topic called “application-logs”, which you have configured on your Kafka servers.

If you prefer, you can also add the myapp and myrotation fields in the Logstash parsing your data. It is just a matter of preference.

You will have a Logstash consumer of topic “application logs” in a pipeline like this:

  kafka  {
                 bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
                 topics => [ "application-logs" ]
                 codec => "json"
                 group_id => "tier1"
                 decorate_events => true

Please notice , that I used decorate_events. This is important for the rest of the pipeline.

Next we will define the filter section:

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }

   if ![myapp]
     mutate {
       add_field => { "myapp" => "default" }

   if ![myrotation]
     mutate {
       add_field => { "myrotation" => "weekly" }

In the filter, we make sure , that we have default values for myapp and myrotation. Now we get to the interesting output section:

      if [myrotation] == "rollover"
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-active"

      if [myrotation] == "daily"
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-%{+YYYY.MM.dd}"

      if [myrotation] == "weekly"
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[customapp]}-%{+xxxx.ww}"

In the output section, we use the information gathered from myapp and myrotation in order to ingest our logs into an application specific index. So this pipeline is just being used to route the data to the correct index.

In this case , data will get stored in “application-log-blog-hits-2019.14”.

You can use this simple approach to carry many different kind of data in a single Kafka topic, while still ingesting to a separate index in Elasticsearch.

Enriching ElasticSearch With Threat Data – Intro

Since my last blog post back in January, I have been seriously distracted! I promised blog posts relating to my lab but have not had the time…. But to keep you guys going until then… I am going to open source my enrichment at scale setup, combining ElasticSearch, MISP, logstash and memcache into one seriously powerful platform.

Have you ever wanted to check your entire logging estate against a threat feed? Multiple threat feeds? If so, you have probably seen that many of the big SIEM providers charge a premium for this service.

What I will demonstrate over the next few posts, is how to accomplish this for free! Well not quite for free, since you need time but you know…..

Lets talk about the diagram above… For my threat data source, I have chosen MISP. My logging sources are Squid Proxy and PiHole. These are the choices you have yourself. The rest of the setup is required to run…

Instead of choosing MISP, you could simply use a single threat data feed, Ransomware tracker could be a good place to start as they offer an open source feed via CSV, which you could quickly parse. The important thing is that you have the right data structure to put the feed into memcache. But we will go over this in further blog posts….

Across the next blog posts, I will talk about the various pieces in the puzzle and how to put them all together… The result is a very scabable, powerful enrichment engine that can ingest and enrich logs in realtime without delaying the log process.

Scaleable syslog pipeline

If you are receiving syslog data from a variety of network devices, you need a design that will allow you to receive and process syslog messages before you ingest them into your Elasticsearch cluster.

Processing syslog messages can be quite heavy in terms of CPU usage, if you are doing a lot of grok statements.

As always, this can be done in many different ways, but in this blog post I will show the basics of a Kafka based architecture.

Initial approach, that will work in many usecases: just put some kind of loadbalancer in front and use that to receive your syslog messages and ship them to some Logstash instances for processing.

This approach will be fine for a small to medium sized setup. But how will you scale this approach? Well , deploy one more Logstash server and edit your loadbalancer configuration to use the new Logstash server. But there is a smarter way.

I suggest that you have your loadbalancer forwarding to 2-3 Logstash servers. You create an extremely simple syslog pipeline. In this syslog input pipeline do absolutely nothing but forward the data to your Kafka cluster.

input {
  tcp {
    port => 514
    type => syslog
  udp {
    port => 514
    type => syslog

filter {

  kafka { 
    bootstrap_servers => "kafka-01"
    topic_id =>  "raw-syslog"

Of course since this is syslog be sure, that this pipeline is backed by a persistent queue in Logstash as syslog is send and forget.

The boxes to run this pipeline can be quite small as there will be no processing going on.

If you are running with RSyslog, you could even configure the RSyslog to send directly to Kafka and you won’t need this Logstash input pipeline.

But right now, you have just raw syslog messages living in your Kafka cluster. You need to process them. They could be ASA firewall messages, where you need to parse them.

So you create an additional Logstash pipeline, that pulls the raw syslog data from Kafka and parses it. This pipeline should be running on other boxes than the one, that received the data. Preferably some quite beefy Logstash nodes. When parsed you can send it back to Kafka, if you need or you can ingest into your Elasticsearch cluster at this point.

	group_id => "raw-syslog-group"
	topics => ["raw-syslog"]
	bootstrap_servers => "kafka-01:<port>"
	codec => json

filter {
    if "%ASA-" in [message] {
      grok {
        match => [
          "message", "<%{POSINT:syslog_pri}>%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} %%{CISCOTAG:cisco_tag}: %{GREEDYDATA:cisco_message}"
      syslog_pri { }

elasticsearch {
 hosts => ["elastic-01:9200"]
 index => "syslog-%{+YYYY.MM.dd}" 

The trick is the group consumer feature of Kafka. In the example I specified group_id => “raw-syslog-group” . So no matter how many Logstash instances have this pipeline running, they will be working as a unit in regards to Kafka.

If you find you need more processing power , deploy an additional Logstash node and deploy this pipeline. You dont have to change your loadbalancer configuration at all.

This setup also makes your life easier , if you can centralize your Logstash processing to a few beefy Logstash nodes. Comes in handy if you are thinking of using Memcached for lookup of malicious IP’s or domain names in all your syslog messages. Hey , that sounds like a topic for a complete blog post of its own;)