Filebeat 7.8 on pfSense 2.4.5

Welcome

Hey and welcome to this blog post by me, my name is Michael Pedersen I am 34 years old and I love open source and security stuff. In August of 2019 I started my journey into the danish InfoSec community and along the way I came across Security Distractions because they too love open source and security! Now one year later I have been invited to join the blog and I am very pleased to be able to publish this first post and hopefully have many more to come. So again – Welcome πŸ™‚

Introduction

On many of my projects I often need a firewall to segment networks, apply different rules, intercept traffic etc. For this I alway use pfSense, because it’s easy to setup, has a ton of functionality, is very well documented and of course free and opensource, which we like.

But unfortunately this is not a blog post about pfSense and how awesome and fun it is to play around with. Actually, this is more of a fusion of different sources that I have put together, to solve a problem I was facing – so let us get started.

For one of my projects I was going to intercept traffic from a client network using pfSense and the squid proxy service and have all traffic written to the squid access log. The second step was then to ship the logs into an Elasticsearch instance and the best way to do this is using Elastic Beats, in this case Filebeat. This is where the challenge awaited.

The challenge…

As you probably know, pfSense is running on FreeBSD and as of the moment, the package manager does not provide any easy install of Elastic components. I searched the internet for a while, not having much luck until I fell over this blog post written by a Swedish pentester called Christoffer Claesson. It seemed he had found an answer to this within an ongoing github issue, but felt that it was to good not to have on its own blog – I totally agree. The original idea was posted by another guy called jakommo which you can read here.

Now, jakommos idea was straight forward. Simply download the github repository and build the beat you wanted yourself. Hell, why did I not think of that! Christoffers and jakommo already has some of the steps covered, but I would like to follow up on them and add a few things.

There is a few prerequisites that you need to get up and running before you start.

Prerequisites…

  1. Get a fresh FreeBSD machine up and running, preferably the same versions as your pfSense box.
  2. Make sure you can reach your FreeBSD box via SSH. You might need to edit the /etc/ssh/sshd_config and allow root login (not best practice – I know)

Let’s go…

The following steps updates the pkg repository and installs the tools needed to be able to clone the filebeat repository and built it from source. This will furthermore install a bunch of dependencioes, which is why this is done on a separate BSD machine to avoid polluting the pfSense box.

root@freebsd:~ # pkg update -f 
root@freebsd:~ # pkg install git gmake go bash
root@freebsd:~ # git clone https://github.com/elastic/beats.git
root@freebsd:~ # cd beats/filebeat/
root@freebsd:~/beats/filebeat/ # gmake

When the process is done, you should now be able to do this:

Now step back one dir, and create an archive of the filebeat folder.

root@freebsd:~/beats/filebeat/ # cd ..
root@freebsd:~ # tar -czvf filebeat.tar.gz ./filebeat/

This concludes the work with our temporary FreeBSD box, now we have to copy the archive out to our local machine so we can get it to our pfSense box. You can do this any way you like, I prefer to use secure copy from my local machine:

Next up, we need to upload our filebeat archive to our pfSense box. This can be done the same way, or you can use the WebUI like this:

in pfSense WebUI -> Diagnostics -> Command Prompt
browse to the archive on your local machine and upload the archive

All right, we are almost there. Log into your pfSense box using the console or SSH. If you used the WebUI to upload the archive, you will find the file in the /tmp/ folder. Now you can move it to wherever you want on pfSense and extract the archive. For this tutorial I simply stayed inside the /tmp/ dir.

extracting the archive
test it is working

And there you have it. All you have to do now is configure the filebeat to fit your needs and type ./filebeat.

Caveats… Always with the caveats…

  • This has only been tried in a virtual environment but I can’t see why this should not work on hardware as well.
  • When you start filebeat it will run in the foreground of the shell you are using. I will try to do a post on how to get it running as a service instead.
  • I have tried this with Filebeat 7.8 and the master branch (8.0 as of this writing).

Outro… (you still listening?)

Lastly, I want to give a shout out to my friend David Thejl-Clayton and his custom squid log format that you can find in our github repo here. David has spent a great deal of time to define a custom log format optimized with all relevant fields critical for detection and analyzing of proxy alerts.

Cheers – Michael

Creating detection rules in Elastic SIEM App

It has been quite a long time since I wrote my last blog post, as with everything, life gets in the way! But I have been spending some quiet time rebuilding my lab, and I have upgraded my ELK stack to 7.6, and I am totally blown away by how awesome the Elastic SIEM app is. So I thought I would put together a few blog posts about how to use it!

Prerequisites

  • You must be running 7.6 (duh)…
  • You must be running the basic license.
  • You must be running at a minimum basic authentication within your setup, between Kibana, Elastic, Logstash etc.
  • You must be running TLS on Elastic.


Enabling each one of these prereqs takes time, and if you are using your stack just for testing purposes and haven’t set up TLS or auth before, then good luck! You are in the lucky position I was last week, and welcome to 2 days of work…
However once you are done, you are ready to move on to the real good stuff…

The good stuff

We will use an example to aid in the instructions, this example is based on creating a detection for each time that there is a Windows Defender event ID 1116 – Malware Detected entry in my logs.

First you will need to open the Elastic SIEM app, and then click on “Detections”.

Once you are in the detections window, on the right hand side you will find “Manage signal detection rules”.

In this window “Signal detection rules”, you can see all the rules you currently have created, or imported. You can manage whether they are activated rules, and many other configuration changes can be done here.

To create a new rule click on “Create new rule”

Within the “Create new rule” section, the first thing you will need to do is to define the index you wish the rule to point at, and then the query you want the rule to run. In this example as I am splitting Defender into a separate index, I have chosen my “sd-defender” index, and then my query is written in KQL (Kibana query language). This query is set to use the ECS (elastic commond schema) field of event.code and will match when it finds event.code 1116. Once you have built this first part, click on “Continue”.

The 2nd stage of building a rule, is to add some more description to the rule…

Here you can name the rule, and write a description of what it is/does. You also assign a “Severity” from low to critical, and a “Risk score” from 0-100. In this case I have chosen “Severity” = High and “Risk score” of 75. When you have finished in this section, click on “Continue”.

In this section you can also add some “Advanced settings”… Where you can supply some reference materials to the alert, if you created it from a blog post, or if it came from a Sigma rule, you could supply a URL here. You can also add some examples of false positives, and then also enrich the rule with some MITRE ATT&CK TTPS! In this example, we won’t add them. But I will be blogging again soon about how to do this part using Sigma rules!

The last part of rule creation, is the “Schedule rule” section. Here you can setup how often you would like the rule to run, and when it does run, how far back in time should it run. This is interesting because if you have just created a new rule, and you would like to see how it would have performed over the last days of logs, then you can adjust that setting here. When you are done setting up the schedule, you can then choose to simply “Create rule without activating it” or “Create and activate rule”, both options are pretty self explanatory!

Once the rule is created, we can try to provoke it and see how it turns out… If you head back to the “Detections” page of the SIEM app. In my example, I am lucky because it is my lab and there is nothing else going on…

Now we will trigger a malware detected alarm, by downloading the EICAR test file to one of my lab machines.

BINGO!

And here is the alert landing in the “Signals” pane, from here we can then begin investigation. Right now there is not very much information about how these alerts will then make it to the attention of someone not using the SIEM app directly. But the SIEM app has some incredible offering here, for free! I have also added a bonus item on how to extract the alerts out to case management tools, slack, etc etc.

Bonus bonus bonus

If you want to extract the alerts out of the SIEM app, you can use a tried a tested tool “Elastalert”. The SIEM app uses a system index called “.siem-signals-default-00001”. This index can be read via Elastalert and the alerts can make it out to your SOC team!

We only need to append

Introduction

As Elasticsearch matures over time, they are fixing some of the less obvious stuff. Seemingly little things can be tremendously important though.

One of the new things, that I want to highlight here is the new security privilegie : create_doc. Read about it the Elasticsearch 7.5 release notes.

As Elastic describes it:

With the previous set of index privileges, users that were allowed to index new documents were also allowed to update existing ones.

With the new create_doc, cluster administrators can create a user that is allowed to add new data only. This gives the minimum privileges needed by ingest agents, with no risk that that user can alter and corrupt existing logs. These administrators can (now) rest assured knowing that components that live directly on the machines that they monitor cannot corrupt or hide tracks that are already into the index.

Have a look at the documention as there is one important change, that is needed in the Elasticsearch Logstash output section.

Implementing it

It is very easy to take advantage of this new feature. Create a role called append_writer and assign a user to the new role:

Or if you prefer developer tools

The final to modify is the output section in Logstash. You need to add an action attribute to it:

Of course , the credentials of the append_writer should be kept in secret store of Logstash!

Conclusion

This simple change is trivial to make, but gives great value. You can rest assured, that the user used in Logstash can never be used to change existing documents in your Elastic clusters.

Using Logstash @metadata

Introduction

In a previous post, I showed how to do simple Kafka and Elasticsearch integration. It showed how to use a single Kafka topic to carry many different types of logs into Elasticsearch.

Have a read if you want to catch up or haven’t read it.

This approach had an undesired sideeffect of putting attributes into Elasticsearch, that are not needed and wasting precious diskspace.

Metadata

However there is very simple and elegant way to fix this. Have a read of the description of Logstash metadata fields here

Previous article suggested this approach. This meant storing kafkatopic,myapp and myrotation in every single document, that went through pipeline.

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
    }

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
}

If we convert to using metadata fields, it could look like this instead. No more kafkatopic,myapp or myrotation being stored.

filter {

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
   # take advantage of metadata fields
   if [myapp]
   {
      mutate {
        rename => { "myapp" => "[@metadata][myapp]" }
      }
   }
   if [myrotation]
   {
      mutate {
        rename => { "myrotation" => "[@metadata][myrotation]" }
      }
   }
}

We can then use the new metadata stuff in the output section

output
{
      if [@metadata][myrotation] == "rollover"
      {
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-active"
                   }
      }

      if [@metadata][myrotation] == "daily"
      {
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][myapp]}-%{+YYYY.MM.dd}"
                   }
     }

      if [@metadata][myrotation] == "weekly"
      {
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[@metadata][kafka][topic]}-%{[@metadata][customapp]}-%{+xxxx.ww}"
                   }
      }
  }

Debugging

As all outputs automatically remove the @metadata object and you are trying to debug your conf file, you now need to do a simple trick to display the contents of metadata.

output
{
  # also show contents of metadata object
  stdout { codec => rubydebug { metadata => true } }
}

Conclusion

So by using this approach we are no longer storing kafkatopic,myapp and myrotation as attributes in every single document, that is passing through this pipeline.

We save diskspace,processing time and documents are clean.

Simplifying Logstash by adding complexity

Background

A lot of logs that goes into Logstash will be done using the beats protocol. So you will have a pipeline in Logstash listening for beats on port 5044 typically. This could be stuff coming from filebeat,winlogbeat,metricbeat or heartbeat.

In your Logstash filter section, you will over time end up with a huge mess trying to add the relevant parsing of logs inside a bunch of if statements. In the output section, you could see the same mess again,where you output the different types of logs inside another bunch of if statements.

If you have done stuff like this, your code will be increasingly difficult to read and debug. Not to mention the problems, you will face, if multiple persons need to be able to contribute to the configuration of Logstash. Also if you need to move parsing of a specific type to another Logstash node. Then you need to grab the relevant parts by copy/paste, which is errorprone.

input {
  beats {
    port => 5044
  }
}

filter {
  if [type] =="winlogbeat" {
    #enrich winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #enrich heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #enrich mylogfile
    ....
  }
  if [type] =="dns" {
    #enrich dns
    ....
  }
  if [type] =="dhcp" {
    #enrich ddhcp
    ....
  }
}

output {
  if [type] =="winlogbeat" {
    #output winlogbeat
    ....
  }
  if [type] =="heartbeat" {
    #output heartbeat
    ....
  }
  if [type] =="mylogfile" {
    #output mylogfile
    ....
  }
  if [type] =="dns" {
    #output dns
    ....
  }
  if [type] =="dhcp" {
    #output dhcp
    ....
  }
}

Simplifying

So what to do about this problem you may ask. Earlier people did some stuff by using named conf files that would be picked up by Logstash to form a large configuration. However we want to be modern and use new features made available by Elastic.

Pipeline to pipeline

I read about pipeline-to-pipeline feature in Logstash a long time ago. There is an excellent article about the options here. This feature is now generally available in 7.4.

It’s actually very simple to implement. You create a pipeline file to receive the beats input and then distribute the events to small tailor made pipelines.

input {
  beats {
    port => 5044
  }
}

filter {
}

output {
        if [type] == "dns" {
          pipeline { send_to => dns }
        } else if [type] == "dhcp" {
          pipeline { send_to => dhcp }
        } else if [type] == "mylogfile" {
          pipeline { send_to => mylogfile }
        } else {
          pipeline { send_to => fallback }
        }
}

Then create a new pipeline to handle the specific log type. This code is restricted to parsing DNS logs.

input {
  pipeline { address => dns }
}

filter {
   # do only your parsing of DNS logs
}

output {
  # output dns
}

You must remember to add all your pipelines to your pipelines.yml file. Remember to think about whether you need in-memory queue or persisted queue per pipeline.

- pipeline.id: beats-input
  path.config: "/etc/path/to/beats-input.config"
  pipeline.workers: 3
- pipeline.id: dns
  path.config: "/etc/different/path/dns.cfg"
  queue.type: persisted
  queue.max_bytes: 4gb
- pipeline.id: dhcp
  path.config: "/etc/different/path/dhcp.cfg"
  queue.type: persisted
  queue.max_bytes: 1gb
- pipeline.id: mylogfile
  path.config: "/etc/different/path/mylogfile.cfg"
  queue.type: persisted
  queue.max_bytes: 2gb

Conclusion

We have started using this approach and will be doing this going forward. We get a much simpler way of handling many different types inside Logstash and we are able to distribute the work to more people.

On top of this we are seeing better latency times in logstash. I suggest to read this article while you are at it. You are effectively using parallel pipelines like the article suggests by this approach.

As always, use this approach if you find it applicable to your usecase.

Watching for no data

Introduction

So you are sending stuff to your Elasticsearch cluster with some beat, eg. filebeat. But as everyone knows , things go wrong , stuff breaks. But you are trying to be proactive and watch for stuff breaking. So why not let Elasticsearch monitor for missing stuff with a watcher. You go in search for some examples and pretty sure, you will end up at this repo: https://github.com/elastic/examples

The examples repo

This repo is used for providing examples of how to do various stuff with your shining Elasticsearch setup. And if you look in the alerter category , you will find a recipe called system fails to provide data. Oh yeah…

Looks pretty useful. Basically you are setting up a watcher to search an index for hosts seen in the last 24 hour and to search for hosts seen in the last 1 hour. However, there is a catch, the sample doesnt provide any example of how to do the delta. You just end up with 2 lists, that you have little use for πŸ˜‰

The revised sample

Every change , I get , when I to talk my friends at Elastic, I tell them, the watcher is too hard to use. Make it simpler, please. And they smile and say, “we know” πŸ™‚

So back to the problem.

You have to do some very funky looking painless scripting to find the delta of those lists, we started out with. You do this by the means of a transform.

This is how the transform sections looks in the repo. It is bascially empty, so there will be no transform going on.

  "actions": {
    "log": {
      "transform": {
      "script": {
        "id":"transform"
      }
    },
      "logging": {
        "text": "Systems not responding in the last {{ctx.metadata.last_period}} minutes:{{#ctx.payload._value}}{{.}}:{{/ctx.payload._value}}"
      }
    }
  }

So this is my attempt to fix this problem. Dont get scared, it is not as bad as it looks. Just add it to the watcher.

  "transform": {
    "script": {
      "source": "def last_period = ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.stream().map(p -> p.key ).collect(Collectors.toList());def history = ctx.payload.aggregations.periods.buckets.history.hosts.buckets.stream().map(e -> e.key ).filter(p -> !last_period.contains(p)).map(p -> [ 'hostname':   p]).collect(Collectors.toList());return  history;",
      "lang": "painless"
    }
}

The source code laid out in a more readable format. Multiline painless scripts in the watcher UI , please , Elastic πŸ˜€

def last_period = ctx.payload.aggregations.periods.buckets.last_period.hosts.buckets.
  stream().
    map(p -> p.key ).
      collect(Collectors.toList());

def history = ctx.payload.aggregations.periods.buckets.history.hosts.buckets.
  stream().
    map(e -> e.key ).
      filter(p -> !last_period.contains(p)).
        map(p -> [ 'hostname':   p]).
          collect(Collectors.toList());

return  history;

That code will make a nice list of hosts that hasn’t delivered data in the last period.

To use the list in the action section, you do something like this. Notice the condition in there as well , to prevent the watcher going off and sending emails, when everything is working:

  "actions": {
    "log": {
      "condition": {
        "compare": {
          "ctx.payload._value.0": {
            "not_eq": null
          }
        }
      },
      "email": {
        "profile": "standard",
        "to": [
          "whoever@whatever.com",
        ],
        "subject": "oh no , data missing",
        "body": {
          "html": "<h1>Systems not delivering data in the last {{ctx.metadata.last_period}} perid</h1>  <ul> {{#ctx.payload._value}}<li>{{hostname}}</li>{{/ctx.payload._value}}</ul>"
        }
      }
    }
  },

Conclusion

As usual, there are more ways to achieve the same thing. You could probably do a extremely complex search also. But if you add these 2 sections to your watcher , you are good to go.

TheHive enrichment

Intro

An increasing number of SOC’s/IRT-teams, etc. are beginning to use The Hive and ElasticSearch.

While researching these tools I saw a lot of talk about enrichment, and tying various tools together, so I wanted to provide my take on it as well.

I am by no means an expert in any of these tools, or in the IRT process, but I have had the priviledge of getting to know a few people that I would consider experts (even though they might not themselves feel that way), and while watching them work, I started thinking that some of the tasks they routinely perform could be eligible for automation.

Specifically I saw that a lot of the time when they where doing triage or incident response, they would receive an alert (this could be from their EDR tool, tier1 SOC, IDS/IPS, etc), where they would only get provided with an ip-address, and a timestamp.

Because most corporate infrastructures are configured with DHCP they would often have to go look at their ElasticSearch logs, to determine which endpoint (hostname) was assigned with the given IP-Address at the given time.

While this is somewhat trivial to do, it is also a well defined, recurring task, which meant that (if possible) i wanted to see if I could automate it.

Integrating TheHive and ElasticSearch

As you may or may not know The Hive uses an underlying enrichment engine called Cortex.

In short, Cortex works by leveraging analyzers (used for collecting information related to an observable, for instance collecting information from VirusTotal in relation to a checksum) and responders (used to act on information, for instance pushing an ip to a blacklist, or sending an email out).

With this in mind I figured that the way to go, would be to create an analyzer that would be able to query ElasticSearch, and return the hostname that was using the given IP-Address at the specified time.

I figured that the way to do this would be to create the event in TheHive, and attach the given IP-Address as an observable, from which the analyzer could be run.

This however turned out to be somewhat of a dead end for me as analyzers have the caveat of only working on observables, which meant that the only way I was able to provide a timestamp to the analyzer was to manually type it into the messageField of the observable (which I briefly considered but ended up deciding would be way to error-prone in a production environment, as the timestamp would have to adhere to specific formatting rules).

Because of this caveat I started looking at the possibilities if I were to implement this as a responder instead (even though this is not how responders are supposed to be used).

I quickly realized that because responders can be invoked on event, alerts and observables, a responder has acces to a wide range of information related to the event, even if it is implemented to only work with observables.

With this in mind I was able to implement functinal timestamps, using customFields with datatype datetime:

So this meant that I was able to implement A functional responder, which was able to query elasticsearch (through the standard rets-API), and return a report containing all relevant entries, corresponding to the query.

I, however was not entirely satisfied by this, as I felt like this could only be considered as somewhat automation, since I would still have to read through the returned report, and manually input the results as new observables.

Completing the automation

Using cortex, I felt quite limited in what I could do with my results, so I started contemplating how to take my attempted automation a step further, and therefore I started looking into the rest-API for TheHive.

This gave me all the possibilities I wanted, and with this in mind, I was able to leverage another customField called autoEnrichment (with datatype boolean) to be able to define whether I wanted the responder to automatically create new observable(s) from the ElasticSearch results.

The actual code

Analyzers and responders usually consist of the following:

  • A requirements file (which defines which non-standard libraries is needed for the analyzer/responder to work)
  • a json file (defining the prerequisites for the responder/analyzer, such as which datatype it can work with)
  • the analyzer/responder itself (the actual code, that performs the required operations)

I, however choose to split the actual analyzer/responder file into 3 seperate files (DHCPResponder.py, DHCPConf.py, and DHCPCallScript.py).

The idea behind this is to seperate the initialization, configurable items, and functionality, in an attempt to make the responder easier to maintain, and easier to build upon, in case a need for a similar responder which can handle other types of logs, should arise.

In keeping with the spirit of maintainability (and best practice) I have also tried to document the code with comments, explaining the functionality, and thoughts behind each code-section, and as such most of the code should be somewhat self-explanatory…

So without further ado, Here is a link to the github repo with the code:

https://github.com/securitydistractions/ElasticSearch-CortexResponder

Fault tolerant Elasticsearch

Introduction

By default your Elasticsearch cluster is pretty robust. Typically you would go for a design with one primary shard and one replica shard. You could have multiple datacenters with low network latency and have the cluster operating in both centers at once. You could also have 2 racks with nodes.

But what happens if you loose one datacenter or one rack? Your cluster will likely go RED if you dont plan for it upfront.

Shard allocation awareness

There are multiple ways to design around a disaster. But one thing you surely need to be aware of is a feature called Shard Allocation Awareness.

You can read the documentation from Elastic here.

Basically this feature enables your Elastic cluster to know about your physical topology. This enables Elastic to be smart enough to put your primary shard and replica shards into 2 different zones. Zones can be a datacenter or a rack as mentioned before.

You tell Elastic this by adding node attributes to your config file. In this example we will add a node attribute called datacenter to our elasticsearch.yml file. It will have 2 possible values : dc1,dc2

node.attr.datacenter: dc1

Once you have added this attribute to all your nodes, you need to perform a rolling cluster restart for the attribute value to be read.

Afterwards you need to enable the feature.

put _cluster/settings
{
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"
  }
}

Shortly thereafter you will notice some shard activity going on in the cluster when the master will arrange your shards according to your topology. When the dust settles , you can rest assured, that your indices are present in both datacenters.

Forced shard allocation awareness

However this all sounds good, but there is a problem. Suppose you loose a datacenter (dc1) now. The cluster will do its best to recover. So it will begin making all replica shards in DC2 into primary and then will start to create new replica also in DC2. This means , that you need to have double up on diskspace in either center.

If you dont have the luxury of having double up on diskspace everywhere, then you should be aware of forced shard allocation awareness.

Here you enable this , notice that you now specify the possible values of the datacenter attribute.

put _cluster/settings
{
  "persistent" : {
   "cluster.routing.allocation.awareness.attributes": "datacenter"
   "cluster.routing.allocation.awareness.force.datacenter.values": 
 "dc1,dc2"
  }
}

When you do this, Elastic knows, that you intend to have your indices available on nodes tagged with these values. So when you loose all nodes in DC1, Elastic is not going to try recover everything into DC2. When this happens , you will see cluster go yellow with 50% of your shards missing. But cluster will be available and operate as before. When DC1 becomes available again , Elastic will start to recover as normal.

Additional benefits

This feature will do more for you than just help out in case of disaster. This feature can also help you when you need to a rolling cluster restart, rolling cluster upgrade or simple base OS patching.

Normally when you do a rolling upgrade, you need to do this node by node. This is cumbersome and takes time. With forced shard allocation awareness, you can take eg. 50% of your warm nodes out of service, patch them or change config and bring them back online. So you should have much faster maintenance on your cluster.

Summary

This setup is not for everyone. If you are really paranoid and have enough resources, you could also make your clusters available multiple places and use CCR as your recovery plan. Examine your options and choose what fits you best.

Index Lifecycle Management – ILM

Introduction

When you are operating Elasticsearch clusters, you will probably need some form of managing the lifetime of your indices. You could also need a way of handling migration from hot to warm nodes. Previously this was typically handled by a separate tool from Elastic called curator.

We have been running with Curator handling our indices for a long time, but we feel handling indices should be builtin functionality. We have had some scripts running in 3 steps as a cron job:

  • rollover – runs every 15 min
  • migration – runs every hour
  • deletion – runs once per day

The configuration files for Curator could prove to a little bit of a pain.

But when Elastic released 6.7.0 – their new Index Lifecycle Management was now a builtin feature making our daily administation simpler. No longer do you need an external cron job running curator. All ILM tasks can be done from within Kibana.

Migration from curator

So this is all good. But what do you actually need to do in order to start migration away from curator and instead start using ILM.

You dont have to migrate every thing in a big bang approach, just move 1 thing at a time. Most of our big indices are using rollover approach, so that will be the focus here.

We came up with these steps in order to do the migration for a specific group of indices.

  • Create ILM Policies
    • Create one for rollover and one for already rolled over indices
  • Apply policy for already rolled over indices
  • Attach rollover policy to template
  • Perform rollover
  • Remove curator support for current group of indices

Create ILM Policies

It’s straightforward to create a policy, just fire up Kibana, Navigate to Management and Index LifeCycle Policies and Create Policy.

So in this example we create 2 policies

  • dns-standard
  • dns-standard-rollover

We create a set of 2 policies per group of indices. If everything you have are using the same periods for hot/warm/deletion, you only need 2 policies for everything. But in order to allow for customization per group of indices, we create policies per group.

So the dns-standard policy is used for existing indices. So it will not contain a rollover phase.

Edit the dns-standard policy again and click the save as new at the top, enable the rollover phase and save it as dns-standard-rollover.

Now please verify the settings in the ILM policies matches your configuration from your old curator jobs πŸ˜‰

Apply policy

In order to apply a policy to a set of indices, you need to goto Kibana Developer tools.

Once you run that , your current DNS indices will be handled by ILM.

Attach rollover policy

In order to attach the rollover policy to a template click actions for the rollover policy. You will be asked to select a template name and you need to enter the write alias for the rollover index.

Perform rollover

Next step is to do a manual rollover of your DNS alias. You will do that from Kibana Developer toools.

Now all DNS indices will be handled by ILM

Remove curator support

Goto your curator server and remove all references to the current set of indices.

Verify your work

Once you have completed the steps , you are able to see in ILM, that everything is working. You can see how many indices are handled by each policy. If you monitor this on a daily basis you will eventually see , when the dns-standard policy is no longer needed (linked-policies=0). This will happen according to your retention settings.

Conclusion

Please try out ILM in your test environment first to get familiar with the steps. Or learn the hard way πŸ˜‰

The latest version of Curator is ILM aware. So it will not touch indices, that are marked by ILM.

One thing to be aware of , is that ILM doesn’t offer all advanced features of Curator. However it delivers, the needed functionality for most basic usecases.

We encourage you to try it out.

Enriching ElasticSearch With Threat Data – Part 3 – Logstash

In our previous post in this series, we have prepared MISP and its API, memcached and created the python script we need to pull data from MISP and push it into our memcached application. In this next blog post, we will cover how to use Logstash to lookup the data stored within Memcached, and then how to enrich ElasticSearch when we get a hit!

A quick mention before we go much deeper, this enrichment setup is capable for ultra fast lookups and working with huge amounts of IoC’s. Without giving away too much, I know of a very large production setup which is running this with close to 120,000 events per second and multiple feeds enabled within MISP…. It will do enrichment in realtime as the logs are being written to ElasticSearch!

Part 1:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-1-misp/

Part 2:- https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

This image has an empty alt attribute; its file name is image-1024x547.png

Logstash – Brief Intro

Logstash is the powerhouse behind our enrichment setup… Since you should already hopefully be familiar with the ELK stack, we won’t touch too much on Logstash and how it is working. But we will focus on parts of it…

Logstash is essentially split up into 3 sections… Input, filter and output.

The input section, is where we define the source of the logging data we want to work with.

The filter section, is where we then work with the logging data. This could be via parsing, normalizing, transforming or multiple other methods to prepare the data for sending out to ElasticSearch…

The output section, is where we define how to then send the data out of logstash, this could be sending directly to ElasticSearch, Kafka or many other output options.

Our blog will focus much more in future on the filter section, about how we can map all logs up against the Elastic Common Schema via grok parsing. But right now in this example, we will just keep it simple and assume you already have some sort of parsing in place for the logging source you want to enrich.

Logstash – Memcached filter

The Logstash Memached filter has recently been made into a fully supported release, which we are very happy for over at Security Distractions. It comes installed by default with Logstash 7.0…

https://www.elastic.co/guide/en/logstash/current/plugins-filters-memcached.html

This means all we need to do within our logstash configuration to enable the memcached plugin, is to write the function in as shown below.

The placement of the memcached section is quite important… It should be after your grok parsing and transforming sections. Preferably as the last function within the filter section.

memcached{
	        hosts => ["127.0.0.1:11211"]
		get => {"domain-%{destination.domain}" => "[misp_src]"}
		}

A quick breakdown of this function, “hosts” is where we specify the location and port of our memcached application.

The “get” is used to tell Logstash which field within the logs it needs to lookup against memached, the result of this match is then written to a new field “misp_src”.

Using the example from our previous blog post, we will use securitydistractions.com as the value within the destination.domain field.

Logstash will append “domain-” to “securitydistractions.com”, resulting in “domain-securitydistractions.com”. It will then make a get request against the memcached application….

“domain-securitydistractions” is populated within the memcached data store, with the value “Feed-RansomwareTracker”. So we get a hit and then this value is written to the new field “misp_src”.

When Logstash does a lookup for a value which is not within the memcached data store, then it will not return a value into misp_src. So just for the sake of good practice we will add a function within Logstash that will populate the misp_src field with the value “none” if there is no match.

if ![misp_src]
	{
		mutate
		{
			add_field=> {"[misp_src]" => "none"}	
		}
	}

Since this setup leverages your already existing ELK stack, you will then only need to handle the new field “misp_src” via visualisations or whatever other fancy way you want to display this field.

In my lab, I use a locally running instance of pihole to generate logs for testing the enrichment setup….

When I get round to it, I will make a part 4… Featuring extensions to the integration. You can run with as many feeds are your heart desires… Your only limit is your imagination for tagging/feed names!

It is possible to further integrate MISP and ELK by using the http plugin. Once the misp_src field is populated, you could take this result and then make a http call to MISP again for further enrichment.