Rui F Ribeiro Rui F Ribeiro - 16 days ago 8
Python Question

logstash: Trying to make sense of strings passed by IntelMQ in ElasticSearch

I have been trying to pass data from

IntelMQ
to
elasticsearch
5.0 using
logstash
via
redis
.

One of the problems is that I receive in bulk in the
elasticsearch
side this string in a field called
message
.

{u'feed': u'openbl', u'reported_source_ip': u'115.79.215.79', u'source_cymru_cc': u'VN', u'source_time': u'2016-06-25T11:15:14+00:00', u'feed_url': u'http://www.openbl.org/lists/date_all.txt', u'taxonomy': u'Other', u'observation_time': u'2016-11-20T22:51:25', u'source_ip': u'115.79.215.79', u'source_registry': u'apnic', u'source_allocated': u'2008-07-17', u'source_bgp_prefix': u'115.79.192.0/19', u'type': u'blacklist', u'source_as_name': u'VIETEL-AS-AP Viettel Corporation, VN', u'source_asn':u'7552'}


Unfortunately, the particular fork of our CERT community is based in python 2.7, and there is no way around migrating to a new fork based in python 3.0 to get rid easily of the unicode marks before the strings.

My present
logstash
configuration is as follows:

filter {
geoip {
source => "source_ip"
}
}
input {
redis {
host => "127.0.0.1"
key => "iscte-redis-queue"
data_type => "list"
codec => "json"
db => "2"
}
}

output {
elasticsearch { hosts => "127.0.0.1" }
}


Obviously, I need to rework that
filter
directive. I though of using the kv logstash filter, however that u mark renders this approach ineffective.

If it weren't from the unicode mark, I would try this:

filter {
kv {
source => "message"
field_split => ", "
trim => "\'"
}
}


Grok rules are not entirely ruled out, however the fields vary, and would oblige me to write a rule to each different bot, based on the 'feed' field.

I am tempted to hack the IntelMQ source to get rid of that unicode mark, however I am taping into your opinions to see if there is a more efective way to handle the processing of this data.

I also am not entirely familiar with
logstash
, how to avoid it generating an entry in redis to each new, and putting it all back into one message where
elasticsearch
will know all the different variables?

So have you any suggestions?

Val Val
Answer

Here's my suggestion: use the mutate/gsub filter in order to get rid or the u and ' characters. That will produce a clean string that you can run through the kv filter.

filter {
  mutate {
    gsub => [
       "message", "u'", "", 
       "message", "',", ",", 
       "message", "':", ":"
    ]
  }
  kv {
    source => "message"
    field_split => ", "
    value_split => ": "
  }
}

You'll get a nice event like this one:

{
               "message" => "feed: openbl, reported_source_ip: 115.79.215.79, source_cymru_cc: VN, source_time: 2016-06-25T11:15:14+00:00, feed_url: http://www.openbl.org/lists/date_all.txt, taxonomy: Other, observation_time: 2016-11-20T22:51:25, source_ip: 115.79.215.79, source_registry: apnic, source_allocated: 2008-07-17, source_bgp_prefix: 115.79.192.0/19, type: blacklist, source_as_name: VIETEL-AS-AP Viettel Corporation, VN, source_asn:7552'",
              "@version" => "1",
            "@timestamp" => "2016-11-23T15:56:13.338Z",
                  "host" => "iMac.local",
                  "feed" => "openbl",
    "reported_source_ip" => "115.79.215.79",
       "source_cymru_cc" => "VN",
           "source_time" => "2016-06-25T11:15:14+00:00",
              "feed_url" => "http://www.openbl.org/lists/date_all.txt",
              "taxonomy" => "Other",
      "observation_time" => "2016-11-20T22:51:25",
             "source_ip" => "115.79.215.79",
       "source_registry" => "apnic",
      "source_allocated" => "2008-07-17",
     "source_bgp_prefix" => "115.79.192.0/19",
                  "type" => "blacklist",
        "source_as_name" => "VIETEL-AS-AP",
               "Viettel" => "Corporation",
            "source_asn" => "7552'"
}