【技术分享】企业级入侵检测系统及实时告警的开源实现

阅读量295372

|

发布时间 : 2016-11-14 11:10:10

作者:qingxp9

稿费:500RMB(不服你也来投稿啊!)

投稿方式:发送邮件至linwei#360.cn,或登陆网页版在线投稿

前言

本文以构建一套完整的入侵检测系统为目的,配合使用多款开源软件快速搭建起包含流量监控、攻击溯源、实时告警等功能的框架。其中告警能高度自由定制,不再局限于传统的Web页面告警,邮件告警。文中,会提供两个例子来帮助大家理解如何去配置告警任务。

1. 入侵行为识别

入侵检测有着使用许多开源的IDS,这里我们以Suricata为例。Suricata是一个高性能的网络IDS,IPS和网络安全监控引擎,它可以兼容snort规则同时是多线程的IDS,我们通过官方提供的文档快速安装上。

https://redmine.openinfosecfoundation.org/projects/suricata/wiki/Quick_Start_Guide

安装完成后,在/etc/suricata/suricata.yaml 配置下:

在15行设置 HOME_NET为本机ip或所在段,是监控的目标网域

在217行把“- flow”注释掉 ,以免数据过多

sudo suricata -c /etc/suricata/suricata.yaml -i eth0

运行后我们将会在 /var/log/suricata/eve.json 文件中得到JSON格式的IDS日志。

2. 日志平台

为了便于后续的攻击日志分析,自然需要对数据进行入库方便我们查询。IDS大都部署在网关上,数据量非常客观,对于日志信息的大数据处理,目前最流行的便是ELK Stack。

http://p1.qhimg.com/t017b1bd48491a38edf.png

Logstash:负责日志的收集,处理和储存

Elasticsearch:负责日志检索和分析

Kibana:负责日志的可视化

http://p8.qhimg.com/t01b092241bd924b7d1.png

写本文时,ELK Stack已经更新了5.0正式版,统一了elasticsearch,logstash,kibana间版本命名问题,同时对Shield, Watcher, Marvel,Graph几个扩展插件经行了封装打包,形成了X-PACK。

安装过程同样略过,在完成ELK及X-PACK的安装后,启动服务:

systemctl start elasticsearch.service
systemctl start kibana.service

注意:安装X-PACK后,elasticsearch和kibana将会开启用户验证,默认用户elastic密码changeme

在/etc/kibana/kibana.yml中需要配置账号并重启kibana服务:

elasticsearch.username: “elastic” 
elasticsearch.password: “changeme”

在服务启动后,通过Logstash对IDS的日志文件进行分析处理,并将数据打入es存储。

新建配置文件/etc/logstash/conf.d/test.conf,键入以下内容:

input {
  file {
    path => "/var/log/suricata/eve.json"
    type => "attack"
  }
}
filter {
  if [type] == "attack" {
    json{
      source => "message"
    }
  }
}
output {
  if [type] == "attack"{
    elasticsearch {
      hosts => ["localhost:9200"]
      user => elastic
      password => changeme
    }
  }
}

随后启动Logstash服务:

systemctl start logstash.service

Kibana便是我们的攻击溯源平台,打开Kibana的Web页面 http://localhost:5601

http://p6.qhimg.com/t013e6210822f40780c.png

可以看到我们的IDS日志已经被结构化的存储在其中,利用检索功能可以快捷且准确的定位到相关日志内容。

比如,可以直接输入“nmap”进行模糊搜索,也可以通过字段匹配语法进行精确匹配“dest_port:80 AND event_type:alert”

以上通过 Suricata 和 ELK Stack 便快速搭建了一套简易但高效的入侵检测溯源系统,使用ELK的优势在于几乎不需要设置便能保证在非常大数据量下的查询速率。而日志的完整性取决于 Suricata 中 攻击识别规则的丰富性,通过官方提供的 snort 规则目前是可以识别常见的流量信息及攻击流量。

3. 高定制化的实时告警

包括很多商业IDS在内做的不好的便是实时告警这一块,有的通过Web页面提供告警,做得好一点则能配置邮件告警。但在我实际的使用中发现,告警策略其实更与系统使用者密切联系,而开发厂商很难能提供周密完善的配置选项,所以一个能高度定制化的告警系统是我们想要的。

ElasticSearch 的 API 接口为我们提供了这种可能,其查询返回的 JSON 格式也非常便于我们进行查询数据的处理。官方提供的 X-PACK 扩展包中便包含了可以提供告警功能的 Watcher 扩展插件。

这里简单介绍下Watcher的配置方法:

PUT _xpack/watcher/watch/xxxx_name
{
  "trigger" : { "schedule" : { "interval" : "10s" }},
  "input" : {},
  "condition" : {"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}},
  "actions" : {}
}

指定一个告警任务,需要配置4个模块:Trigger、Input、Condition、ACTIONS

Trigger

Trigger控制如何触发一次查询,可以设置间隔时间

每隔10s触发
"trigger" : {
  "schedule" : { "interval" : "10s" } 
}

也支持大家熟悉的cron格式

每天9:05触发
"trigger" :{
  "schedule" : {"cron" : "0 5 9 * * ?"}
}

Input

Input 负责数据的来源,支持以下四个input类型

simple: 读取静态内容

search: 读取查询语句执行后的返回结果

http: 读取HTTP请求的返回结果

chain: 用来自由组合使用上面三种类型组成的多个input

以search为例举个例子

查询“logs”索引,匹配message字段为error的数据
 "input" : {
    "search" : {
      "request" : {
        "indices" : [ "logs" ],
        "body" : {
          "query" : {
            "match" : { "message": "error" }
          }
        }
      }
    }
  }

Condition

condition决定之后的ACTION是否执行

当查询数量大于0时执行ACTION
  "input" : {
    "search" : {
      "request" : {
        "indices" : [ "logs" ],
        "body" : {
          "query" : {
            "match" : { "message": "error" }
          }
        }
      }
    }
  },
  "condition" : {
    "compare" : { "ctx.payload.hits.total" : { "gt" : 0 }} 
  }

Action

当条件符合后,ACTION决定执行的操作即信息内容及发送的对象。

支持email, webhook, index, logging, hipchat, Slack, 和 pagerduty

发送POST到 xxx.com:123/path ,内容为json字串 {"a":"b","total":xxx}
"actions" : {
  "xxx_webhook_name" : { 
    "webhook" : {
      "method" : "POST", 
      "host" : "xxx.com", 
      "port" : 123, 
      "path": ":/path", 
      "body" : "{
        "a": "b'",
        "total": "{{ctx.payload.hits.total}}"
      }"
    }
  }
}

利用webhook可以向任意Web服务发送请求,这是我们想要的。通过这个接口非常便于与企业内的其他已有平台进行交互,或者可以通过如今流行的IM公众号。

这里以蓝信为例,需要先请求获取一个token,然后发送JSON字串到API即可,其中JSON字串的内容为:

http://p2.qhimg.com/t01e1feeaeeb2b2c483.png

了解了基础的Watcher配置方法和蓝信接口后,我们来制作告警任务。拟一个目标:在每天9、14、18点查询当日日志数量,日志类型的统计,来源IP统计。

PUT _xpack/watcher/watch/attack_alert
{
    "trigger" :{"schedule" : {"cron" : "0 0 9,14,18 * * ?"}},
    "input": {
        "chain": {
            "inputs": [
                ##发送HTTP请求到蓝信获取token
                {
                    "first": {
                        "http" :{
                            "request": {
                                "host": "lxopen.api.xxx.com","port": 80,
                                "path":  "/ffff/token",
                                "params": {"grant_type":"client_credential","appid":"12345","secret":"abcde"}
                            }
                        }
                    }                   
                },
                ##进行查询得到目标结果
                {
                    "all": {
                      "search": {
                            "request": {
                                "indices": "<logstash-{now/d}>",
                                "body": {
                                    "query": { 
                                        "bool": {
                                            "must": {"match_all": {}}
                                        }
                                    },
                                    "aggs" : {
                                        "ip_list" : {
                                            "terms" : { "field" : "src_ip.keyword" }
                                        },
                                        "type_list" : {
                                            "terms" : { "field" : "event_type.keyword" }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            ]
        }
    },
    ##只要数量大于0就执行ACTION
    "condition" :{
        "compare" : {"ctx.payload.all.hits.total": {"gt": 0}}
    },
     "actions": {
        "attack_alert" : {
            "throttle_period" : "600s", //每两次告警间的最小时间间隔
            "webhook" : {
                "method" : "POST",
                "headers": {"Content-type": "application/json"},
                "host" : "lxopen.api.xxx.com",
                "port" : 80,
                "path" :"/xxxx/message/send", 
                "params" : {"access_token" : "{{ctx.payload.first.access_token}}"}, //传入前面获取到的 token
                #构造我们的文本消息
                "body" : "{"toall": "true","tousers": ["180******0"],"msgtype": "text","text": {"content": "=======Total {{ctx.payload.all.hits.total}}\n{{#ctx.payload.all.aggregations.type_list.buckets}}{{key}} {{doc_count}}\n{{/ctx.payload.all.aggregations.type_list.buckets}}\n=======Src IP\n{{#ctx.payload.all.aggregations.ip_list.buckets}}{{key}} {{doc_count}}\n{{/ctx.payload.all.aggregations.ip_list.buckets}}"}}"
            }
        }
    }
}

创建成功后,蓝信客户端就可以每天定时收到当天的日志统计报告了。

http://p2.qhimg.com/t01e8aa262b79f15463.png

只需要改下接口,也可以同时发送到其他如微信等平台上。

第二个例子,来个干货。

针对敏感操作的日志记录。对于敏感或者储存重要数据的服务器,如果发生了入侵事件我们可以通过命令记录来查看黑客入侵的方式,但有经验的入侵者往往会删去这些记录。为了防止这一点,我们有必要对命令历史记录做一个增强,让它能较完整的记录命令相关信息(时间、命令、登录用户、当前用户等),同时能实时将日志回传到es。

在/etc/bash.bashrc行末加入以下内容:

HISTDIR='/var/log/command.log'
if [ ! -f $HISTDIR ];then
    touch $HISTDIR
chmod 666 $HISTDIR
    fi
    export HISTTIMEFORMAT="{"TIME":"%F %T","HOSTNAME":"$HOSTNAME","LI":"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')","LU":"$(who am i|awk '{print $1}')","NU":"${USER}","CMD":""
    export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]+[0-9]+  //"|sed "s/$/"}/">> /var/log/command.log'

执行 source /etc/bash.bashrc,就可以在/var/log/command.log看到日志

http://p6.qhimg.com/t0107bcff5716d5ac55.png

JSON类型的日志,通过logstash中配置日志回传:

input {
  ...
  file {
    path => "/var/log/command.json"
    type => "cmd"
  }
  ...
}
filter {
  ...
  if [type] == "cmd" {
    json{
      source => "message"
    }
  }
  ...
}
output {
  if [type] == "attack" or [type] == "cmd"{
    elasticsearch {
      hosts => ["localhost:9200"]
      user => elastic
      password => changeme
    }
  }
}

这样ES就会实时收集主机的shell日志了。

最后完善下告警任务,在Input里加入:

 {
    "cmd": {
      "search": {
            "request": {
                "indices": "<logstash-{now/d}>",
                "types" : "cmd",
                "body": {
                    "query": { 
                        "bool": {
                            "must": {"match_all": {}}
                        }
                    }
                }
            }
        }
    }
}

在告警内容里面添加:

\n=======CMD\n{{#ctx.payload.cmd.hits.hits}}{{HOSTNAME}} {{LI}}:{{CMD}}\n{{/ctx.payload.cmd.hits.hits}}

效果预览像这样:

http://p2.qhimg.com/t018f7321b528ee08c2.png

一旦服务器上有操作产生,我们便能立马得到通知,进行应急响应。

最后

对于告警,我想说的是一定要记得:满篇的告警是不会有人去看的。重点过多也就没有重点了,告警一定要精而不要多,只针对一些关键的异常告警往往能事半功倍。

参考链接

蓝信开放平台 https://docs.lanxin.cn/

Getting Started with Watcher https://www.elastic.co/guide/en/x-pack/current/watcher-getting-started.html

Suricata IDS https://suricata-ids.org/

本文由360天马安全团队原创发布

转载,请参考转载声明,注明出处: https://www.anquanke.com/post/id/84904

安全KER - 有思想的安全新媒体

分享到:微信
+10赞
收藏
360天马安全团队
分享到:微信

发表评论

Copyright © 北京奇虎科技有限公司 三六零数字安全科技集团有限公司 安全KER All Rights Reserved 京ICP备08010314号-66