Processors in Winlogbeat

There is a probably little known feature hidden in the Beats. Its something called Processors.

By using processors you can do some lightweight filtering of data before the data leaves your endpoint.

We have used this feature in order to help decode event 2889 from the Directory Service Log on Domain Controllers. An event 2889 will appear in your Directory Service Log, if someone is binding to your Domain Controllers by clear text LDAP passwords. You really dont want that going on these days,right?

The event contains a field called event_data.param1. This looks like <ip>:<port>. But in most cases you really dont care about the port, you only need the IP from where the traffic is coming, so you can visualize it in Kibana.

So you can define a section in your winlogbeat.yml to fix this:

processors:
- drop_fields:
    fields: ["host"]   
 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"
- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

This example is using 3 processors.

First section is using the drop_fields processor. This will drop fields from the events with the name “host”. We do this because of a mapping conflict between data from old Winlogbeat versus new Winlogbeat.

- drop_fields:
    fields: ["host"]   

Next processor is the dissect processor. This is the one that is parsing our <ip>:<port> and splits the information up into 2 distint fields. Notice we have a when condition, so we only do this in case of event_id equals 2889.

 - dissect:
     when:
       equals:
         event_id: 2889
     tokenizer: "%{host}:%{port}"
     field: "event_data.param1"
     target_prefix: "event_data_param1_split"

Finally we try to use the DNS processor to resolve the IP adress to a proper DNS name

- dns:
     when:
       equals:
         event_id: 2889
     type: reverse
     fields:
       event_data_param1_split.host: event_data_param1_split.hostname
     success_cache:
       capacity.initial: 1000
       capacity.max: 10000
     failure_cache:
       capacity.initial: 1000
       capacity.max: 10000
       ttl: 1m
     nameservers: ['10.1.2.3', '10.2.3.4']
     timeout: 500ms
     tag_on_failure: [_dns_reverse_lookup_failed]

As usual, all this could be in other ways. If you have your Winlogbeat data flowing through Logstash, you can do parsing there instead. This is just a short example of what can be done with the processors feature.

Leave a Reply

Your email address will not be published. Required fields are marked *