Enriching ElasticSearch With Threat Data – Part 1 – MISP

This image has an empty alt attribute; its file name is image-1024x547.png

There is a lot of great blog posts and reads available on the MISP platform, so I don’t want to do it injustice by writing a huge intro here… I have a plan to write a more in depth blog post about MISP in the distant future, but before then please go on over to the MISP project site:
https://www.misp-project.org

What we are interested in for our enrichment, is how to leverage MISP to produce our own threat data feeds.

MISP allows you to create your own events made up of IoC’s and then leverage these as a threat data feed.

MISP out of the box also has support for many open source threat feeds and it can aggregate these and display them in a chosen standard. This can really help with centralizing your organisations threat data.
So you can combine OSINT and your own intelligence for enrichment into ElasticSearch.

We will begin our example by working with the Ransomware Tracker CSV feed which can be enabled in MISP. This feed is well known by the community and will give a good understanding of how the integration works.

MISP Prep

To get started you can download a training MISP instance here (or use your own MISP instance):- https://www.circl.lu/misp-images/latest/

Once you have your instance running and can access the WebUI, you should navigate to “Sync Actions” and then down to “List Feeds”

This will present you with a screen showing all of the default available threat data feeds and their sources.

If you scroll through this list, eventually you will find Ransomware Tracker.

You will need to check the tick box next to the feed, and then scroll to the top and select “Enable selected”.

One the feed is enabled, you will need to return to the Ransomware Tracker section and all the way at the right handside there is a button with the tooltip “fetch all events”

This will then begin the job to fetch the entire Ransomware Tracker feed into a MISP event. To find the event highlight the “Event Actions” button and then click on the “List Events” option.

This will take you to your MISP instance’s event section. Yours will look slightly different to mine, if you are using MISP already then it will be populated with events you have been working with or synced with. If not and you are new to this, then it should be populated with only 1 event… With the Info set to “Ransomware Tracker CSV Feed”.

When you drill down into the event, you will find some information relating to the threat feed, including an item in red “Published : “No”. This means that the event is currently stored inside MISP, but is not available for distribution, via the API or a sharing method. This allows us to work on the event without fear of publishing something by accident.

You can scroll through the event and see all of the IoC’s contained within the Ransomware Tracker feed, but what we are interested in now is tagging the Ransomware Tracker feed so we can export it via the API as one feed.

To do this, we will need to create a new custom tag within MISP….

Hover over the “Event Actions” button and then click on “Add Tag”.

You will then be presented with the Add Tag section, here you can give your new tag a name. For this example we will name it “Feed-RansomwareTracker”, choose the colour the tag will have in your event view, and then ensure “Exportable” is checked. Then click “Add”.

You can then go back to your Ransomware Tracker CSV event….

This image has an empty alt attribute; its file name is image-7-1024x489.png

As part of the event info, you can see a section called “Tags” with a + button next to it. Click on the + button, and then add your newly created Feed-RansomwareTracker tag to the event.

The last step is to then publish the event, so it can be retrieved via the API for pushing into ElasticSearch!

On the left hand side next to the event info, you can find the option for “Publish Event”. Click on this and then click “Yes” when prompted to publish the event.

This has now published the event and the tags and it is ready to be retrieved by the API.

MISP API

Alongside the amazing WebUI for MISP, there is an incredibly strong API engine running underneath. Again I won’t focus too much here on singing it’s praises, this I will save for a later post!

But in this example, we will use the MISP API to pull out the tagged Ransomware Tracker feed for use within ElasticSearch.

To prepare the API for our scripts, all we need to do is find the automation key…

Hover over the “Event Actions” button within the MISP WebUI… And click on the “Automation” button.

Within the Automation section you can find your automation key:-

Save this key, you will need it later for your Python script!

This concludes our preparation work within MISP, next up…. Python and Memcached….

Part 2:-https://www.securitydistractions.com/2019/05/17/enriching-elasticsearch-with-threat-data-part-2-memcached-and-python/

Simple Kafka and Elasticsearch integration

Kafka is a awesome platform for moving data around. It is often used together with an Elasticsearch cluster in order to host data before data gets ingested into Elasticsearch.

Kafka deals with topics to carry some specific data around. Imagive having topics for dns,dhcp, firewall and so. You can quickly end up with a high number of topics, right?

So in this blog post , I will present a way for you to utilize a single Kafka topic to carry many kinds of data, while still being able to ingest into different Elasticsearch indices. It also enables you to specify the rotation of the indices: rollover,weekly,daily or what your needs may be.

The approach works by creating a few simple properties alongside your data:

  • myapp
  • myrotation

Lets use this scenario. You have some kind of logfile, that contains log data for your app “blog-hits”. Your app is low volume in terms of log and you just need a weekly index.

You install filebeat and add these entries to your filebeat configuration.

  fields:
    myapp: blog-hits
    myrotation: weekly
  fields_under_root: true

You would then configure Filebeat to send this to Logstash for further parsing. After parsing Logstash sends to Kafka on a topic called “application-logs”, which you have configured on your Kafka servers.

If you prefer, you can also add the myapp and myrotation fields in the Logstash parsing your data. It is just a matter of preference.

You will have a Logstash consumer of topic “application logs” in a pipeline like this:

input
{
  kafka  {
                 bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
                 topics => [ "application-logs" ]
                 codec => "json"
                 group_id => "tier1"
                 decorate_events => true
  }
}

Please notice , that I used decorate_events. This is important for the rest of the pipeline.

Next we will define the filter section:

filter {
    mutate {
        copy => { "[@metadata][kafka][topic]" => "kafkatopic" }
    }

   if ![myapp]
   {
     mutate {
       add_field => { "myapp" => "default" }
     }
   }

   if ![myrotation]
   {
     mutate {
       add_field => { "myrotation" => "weekly" }
     }
   }
}

In the filter, we make sure , that we have default values for myapp and myrotation. Now we get to the interesting output section:


output
{
      if [myrotation] == "rollover"
      {
                  elasticsearch {
                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-active"
                   }
      }

      if [myrotation] == "daily"
      {
                   elasticsearch {

                                   hosts => ["https://elastic01:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[myapp]}-%{+YYYY.MM.dd}"
                   }
     }

      if [myrotation] == "weekly"
      {
                  elasticsearch {
                                   hosts => ["https://elastic91:9200" , "https://elastic02:9200"]
                                   manage_template => false
                                   index => "%{[kafkatopic]}-%{[customapp]}-%{+xxxx.ww}"
                   }
      }
  }

In the output section, we use the information gathered from myapp and myrotation in order to ingest our logs into an application specific index. So this pipeline is just being used to route the data to the correct index.

In this case , data will get stored in “application-log-blog-hits-2019.14”.

You can use this simple approach to carry many different kind of data in a single Kafka topic, while still ingesting to a separate index in Elasticsearch.

Enriching ElasticSearch With Threat Data – Intro

Since my last blog post back in January, I have been seriously distracted! I promised blog posts relating to my lab but have not had the time…. But to keep you guys going until then… I am going to open source my enrichment at scale setup, combining ElasticSearch, MISP, logstash and memcache into one seriously powerful platform.

Have you ever wanted to check your entire logging estate against a threat feed? Multiple threat feeds? If so, you have probably seen that many of the big SIEM providers charge a premium for this service.

What I will demonstrate over the next few posts, is how to accomplish this for free! Well not quite for free, since you need time but you know…..

Lets talk about the diagram above… For my threat data source, I have chosen MISP. My logging sources are Squid Proxy and PiHole. These are the choices you have yourself. The rest of the setup is required to run…

Instead of choosing MISP, you could simply use a single threat data feed, Ransomware tracker could be a good place to start as they offer an open source feed via CSV, which you could quickly parse. The important thing is that you have the right data structure to put the feed into memcache. But we will go over this in further blog posts….

Across the next blog posts, I will talk about the various pieces in the puzzle and how to put them all together… The result is a very scabable, powerful enrichment engine that can ingest and enrich logs in realtime without delaying the log process.


Scaleable syslog pipeline

If you are receiving syslog data from a variety of network devices, you need a design that will allow you to receive and process syslog messages before you ingest them into your Elasticsearch cluster.

Processing syslog messages can be quite heavy in terms of CPU usage, if you are doing a lot of grok statements.

As always, this can be done in many different ways, but in this blog post I will show the basics of a Kafka based architecture.

Initial approach, that will work in many usecases: just put some kind of loadbalancer in front and use that to receive your syslog messages and ship them to some Logstash instances for processing.

This approach will be fine for a small to medium sized setup. But how will you scale this approach? Well , deploy one more Logstash server and edit your loadbalancer configuration to use the new Logstash server. But there is a smarter way.

I suggest that you have your loadbalancer forwarding to 2-3 Logstash servers. You create an extremely simple syslog pipeline. In this syslog input pipeline do absolutely nothing but forward the data to your Kafka cluster.

input {
  tcp {
    port => 514
    type => syslog
  }
  udp {
    port => 514
    type => syslog
  }
}

filter {
}

output{
  kafka { 
    bootstrap_servers => "kafka-01"
    topic_id =>  "raw-syslog"
  }
}

Of course since this is syslog be sure, that this pipeline is backed by a persistent queue in Logstash as syslog is send and forget.

The boxes to run this pipeline can be quite small as there will be no processing going on.

If you are running with RSyslog, you could even configure the RSyslog to send directly to Kafka and you won’t need this Logstash input pipeline.

But right now, you have just raw syslog messages living in your Kafka cluster. You need to process them. They could be ASA firewall messages, where you need to parse them.

So you create an additional Logstash pipeline, that pulls the raw syslog data from Kafka and parses it. This pipeline should be running on other boxes than the one, that received the data. Preferably some quite beefy Logstash nodes. When parsed you can send it back to Kafka, if you need or you can ingest into your Elasticsearch cluster at this point.

kafka{
	group_id => "raw-syslog-group"
	topics => ["raw-syslog"]
	bootstrap_servers => "kafka-01:<port>"
	codec => json
}

filter {
    if "%ASA-" in [message] {
      grok {
        match => [
          "message", "<%{POSINT:syslog_pri}>%{CISCOTIMESTAMP:timestamp} %{SYSLOGHOST:sysloghost} %%{CISCOTAG:cisco_tag}: %{GREEDYDATA:cisco_message}"
        ]
      }
      syslog_pri { }
  ....
  ....
  }
}

elasticsearch {
 hosts => ["elastic-01:9200"]
 index => "syslog-%{+YYYY.MM.dd}" 
}

The trick is the group consumer feature of Kafka. In the example I specified group_id => “raw-syslog-group” . So no matter how many Logstash instances have this pipeline running, they will be working as a unit in regards to Kafka.

If you find you need more processing power , deploy an additional Logstash node and deploy this pipeline. You dont have to change your loadbalancer configuration at all.

This setup also makes your life easier , if you can centralize your Logstash processing to a few beefy Logstash nodes. Comes in handy if you are thinking of using Memcached for lookup of malicious IP’s or domain names in all your syslog messages. Hey , that sounds like a topic for a complete blog post of its own;)

Processors in Winlogbeat

There is a probably little known feature hidden in the Beats. Its something called Processors.

By using processors you can do some lightweight filtering of data before the data leaves your endpoint.

We have used this feature in order to help decode event 2889 from the Directory Service Log on Domain Controllers. An event 2889 will appear in your Directory Service Log, if someone is binding to your Domain Controllers by clear text LDAP passwords. You really dont want that going on these days,right?

The event contains a field called event_data.param1. This looks like <ip>:<port>. But in most cases you really dont care about the port, you only need the IP from where the traffic is coming, so you can visualize it in Kibana.

So you can define a section in your winlogbeat.yml to fix this:

processors:
- drop_fields:
    fields: ["host"]   
 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"
- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

This example is using 3 processors.

First section is using the drop_fields processor. This will drop fields from the events with the name “host”. We do this because of a mapping conflict between data from old Winlogbeat versus new Winlogbeat.

- drop_fields:
    fields: ["host"]   

Next processor is the dissect processor. This is the one that is parsing our <ip>:<port> and splits the information up into 2 distint fields. Notice we have a when condition, so we only do this in case of event_id equals 2889.

 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"

Finally we try to use the DNS processor to resolve the IP adress to a proper DNS name

- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

As usual, all this could be in other ways. If you have your Winlogbeat data flowing through Logstash, you can do parsing there instead. This is just a short example of what can be done with the processors feature.

Elastic 7.0.0 Beta 1 Released

Elastic is moving fast as usual in preparation for the upcoming version 7.0.0. They have just released 7.0.0 beta 1, so it’s out of alpha mode. It important to keep an eye on this, so you will not be surprised by some of the breaking changes.

As usual it is packed with new features:

  • TLS 1.3 Support
  • ZEN2 – Cluster coordination layer.
  • Nano second precision
  • New Maps application
  • New Heartbeat Application
  • New Visual Layout in Kibana
  • Query Speed Improvements
  • Elastic Common Schema
  • The list goes on

Read the official news here:

https://www.elastic.co/blog/elasticsearch-7-0-0-beta1-released

Kibana – Chromium

When you have installed your Elasticsearch cluster with Kibana on your favourite Linux distribution, you will probably at some time want to generate a report as a PDF.

Elastic decided to switch from PhantomJS to Chromium as headless browser in the latest releases. And Chromium has some dependencies, that Kibana install is not handling for you.

So if you goto Kibana/Managment/Reporting, it’s possible, you will see this error:

Chromium fails to start

This can be caused by a variety of reasons. But a typical cause is missing dependencies for Chromium. This is documented at this link.

The page has sections documenting the dependencies for the most popular distributions.

So we add these packages to our Kibana Ansible playbook:

Kibana Ansible playbook

After having run this updated playbook, we see, that we are now able to generate PDF reports as expected.

Sample PDF

Modular Elasticsearch Architecture

This topic is very interesting to us and we have spent a lot of time trying to get it right. We have gone through a number of architecture designs, while we were maturing with the Elasticsearch product. Needless to say, this topic is very complex and just because something works for us doesn’t mean that it will work for you. So with that in mind, let’s delve into this topic.

A lot of people hear stories about Elasticsearch and how easy and straightforward it is to get going. And it really is that easy. Most will have an Elasticsearch installation running after an hour or so. It’s just a matter of following the excellent documentation at Elastic’s website.

So once you installed the first node , you want to add a few other nodes, still simple operation.

Single node cluster

You end up with for example a simple 3 node cluster, where all nodes holds the same roles. All are master , data, kibana and logstash. And that is perfectly fine , if you are just trying out stuff.

3 node cluster

However. As you deploy more of these small clusters for various usecases , you tend to get frustrated by this approach as these small cluster can’t talk to each other. You can’t search all your data from one single location. And you need to maintain and monitor each cluster.

So next logical approach is to try and go bigger and follow recommendations from Elastic. So you add dedicated masters nodes , dedicated Kibana nodes, dedicated data nodes for hot and warm. This is cool, we can extend this cluster unlimited and put all our data in this huge cluster. And you probably can, but you probably dont want to.

Getting bigger

Somewhere down along the line, the masters will have trouble keeping up. Some usecase will end up influencing the other usecases running in the same cluster. Upgrading the nodes becomes a pain. If bad stuff happens to the cluster, everything is gone. And maybe you will feel a need to rethink the architecture.

So we did that..

What we came up with is a simple modular approach, where you will get these benefits:

  • Search all your data from one place
  • Centralized Logstash pipeline management
  • Centralized Cluster Monitoring
  • Centralized Machine Learning
  • Usecase/workload isolation
  • Simple maintenance
  • Unlimited scalability

Through the power of Cross Cluster Search, you can easily build this architecture. If you have not heard of CCS , I encourage you to read the docs. It is a faily new feature, that allows you to connect to a cluster and search remotely connected clusters. Completely transparent. Your users dont need to know or care about, which Elasticsearch cluster, their data is located in.

Only caveat is that it is a licensed feature, so you will need a subscription from Elastic in order to deploy this topology.

Modular Architecture

So the topology is basically a frontend cluster with these resources:

  • Master nodes
  • Kibana nodes
  • ML nodes
  • Small data nodes , no end user data

Then you build a number of usecase clusters or backend clusters.

  • Master nodes
  • Coordinator nodes – if required, depends on size
  • Hot Data
  • Warm Data

You attach these backend clusters to the frontend cluster by adding them as remote cluster. You configure the backend clusters to use the frontend for monitoring.

This architecture isolates the clusters from each other and thus the workload. You can easily do rolling upgrades without affecting the whole setup. You could even run with different versions of Elasticsearch in the clusters. However that is not something we would recommend.

You will need to install Kibana in the backend clusters also as the developer tools don’t work across clusters yet. This is purely for administrative purposes, no end user access. We are hoping that remote connect in developer tools will be available in a future release of Kibana.

Whenever you have a need for a new usecase, that doesnt play well with the existing backend clusters, you simply deploy a new backend cluster and plug it into the frontend. All your existing usecases will continue running completely unaware of this added workload. The data will be instantly searchable from the same Kibana locations, that your user are accustomed to.

In regards to the Logstash nodes. You install a group of them and configure them to connect to the frontend cluster for their pipeline management. In the output section of the actual pipelines , you target the specific backend cluster. Typically the coordinator nodes in the backend cluster.

Your end users connect to Kibana in the frontend cluster and searches transparently all the backend clusters, that they are allowed to. The roles that you assign them in the frontend will be carried with their search requests to the backend clusters.

Hopefully you will have the Kibana setup with Spaces, so it gives your users a more structured view of all the data availabke to them. And you as the admin , can monitor the state of your entire Elasticsearch setup in one place.

As we started out. Elasticsearch architecture is complex and there is no size fits all. We hope this article gave you some ideas on, how to build a highly scalable and modular Elasticsearch setup allowing you to take advantage of the power of Elasticsearch.

Building Your Own Blue Team Lab

Introduction

Every blue team member should spend some time investing in building their own lab setup. It can be a great and fun learning experience, where you pickup some “low hanging fruit” skills. Hopefully comíng out with something you will continue to use and develop over the rest of your InfoSec career.

Having your own lab, allows you to quickly test theories and detection methods. It can be adapted to support any usecase you need, your only limit is your own imagination….

How does it look?

The lab is designed to work as follows…. You add an IoC to an event within MISP, this is then distributed to the black list’s on your prevention tools. At the same time, this IoC is used to alert based on logs coming into ElasticSearch. The IoC will be added to an ElastAlert rule, which then takes care of searching back through the ElasticSearch logs for previous activity. ElastAlert needs somewhere to send its alerts to and this is where The Hive comes into play…. Sounds simple right?

Open source is the only way…

The Security Distractions lab is only based on Open Source tools, so your only investment if you decide to build this, will be your own time! It can be used for production with a few modifications…

Over the next few blog posts, we will go into each tool and their integration points. We promise to try to keep it exciting!

But how will I run the lab?

This lab can be built using whatever method you want… We will supply the configuration files for each tool where needed, but it is up to you how it is run. I like to run using VM’s but others are obssessed with Docker. So it is about using whatever you feel most comfortable with. For those planning on using VM’s, the first post will be about VirtualBox, so you can get started…. If you’re using Docker, then ummm…… You’re on your own!

All configuration files will be found over on our GitHub page:- https://github.com/securitydistractions