作者:qingxp9
稿费:500RMB(不服你也来投稿啊!)
投稿方式:发送邮件至linwei#360.cn,或登陆网页版在线投稿
前言
本文以构建一套完整的入侵检测系统为目的,配合使用多款开源软件快速搭建起包含流量监控、攻击溯源、实时告警等功能的框架。其中告警能高度自由定制,不再局限于传统的Web页面告警,邮件告警。文中,会提供两个例子来帮助大家理解如何去配置告警任务。
1. 入侵行为识别
入侵检测有着使用许多开源的IDS,这里我们以Suricata为例。Suricata是一个高性能的网络IDS,IPS和网络安全监控引擎,它可以兼容snort规则同时是多线程的IDS,我们通过官方提供的文档快速安装上。
https://redmine.openinfosecfoundation.org/projects/suricata/wiki/Quick_Start_Guide
安装完成后,在/etc/suricata/suricata.yaml 配置下:
在15行设置 HOME_NET为本机ip或所在段,是监控的目标网域
在217行把“- flow”注释掉 ,以免数据过多
sudo suricata -c /etc/suricata/suricata.yaml -i eth0
运行后我们将会在 /var/log/suricata/eve.json 文件中得到JSON格式的IDS日志。
2. 日志平台
为了便于后续的攻击日志分析,自然需要对数据进行入库方便我们查询。IDS大都部署在网关上,数据量非常客观,对于日志信息的大数据处理,目前最流行的便是ELK Stack。
Logstash:负责日志的收集,处理和储存
Elasticsearch:负责日志检索和分析
Kibana:负责日志的可视化
写本文时,ELK Stack已经更新了5.0正式版,统一了elasticsearch,logstash,kibana间版本命名问题,同时对Shield, Watcher, Marvel,Graph几个扩展插件经行了封装打包,形成了X-PACK。
安装过程同样略过,在完成ELK及X-PACK的安装后,启动服务:
systemctl start elasticsearch.service
systemctl start kibana.service
注意:安装X-PACK后,elasticsearch和kibana将会开启用户验证,默认用户elastic密码changeme
在/etc/kibana/kibana.yml中需要配置账号并重启kibana服务:
elasticsearch.username: “elastic”
elasticsearch.password: “changeme”
在服务启动后,通过Logstash对IDS的日志文件进行分析处理,并将数据打入es存储。
新建配置文件/etc/logstash/conf.d/test.conf,键入以下内容:
input {
file {
path => "/var/log/suricata/eve.json"
type => "attack"
}
}
filter {
if [type] == "attack" {
json{
source => "message"
}
}
}
output {
if [type] == "attack"{
elasticsearch {
hosts => ["localhost:9200"]
user => elastic
password => changeme
}
}
}
随后启动Logstash服务:
systemctl start logstash.service
Kibana便是我们的攻击溯源平台,打开Kibana的Web页面 http://localhost:5601
可以看到我们的IDS日志已经被结构化的存储在其中,利用检索功能可以快捷且准确的定位到相关日志内容。
比如,可以直接输入“nmap”进行模糊搜索,也可以通过字段匹配语法进行精确匹配“dest_port:80 AND event_type:alert”
以上通过 Suricata 和 ELK Stack 便快速搭建了一套简易但高效的入侵检测溯源系统,使用ELK的优势在于几乎不需要设置便能保证在非常大数据量下的查询速率。而日志的完整性取决于 Suricata 中 攻击识别规则的丰富性,通过官方提供的 snort 规则目前是可以识别常见的流量信息及攻击流量。
3. 高定制化的实时告警
包括很多商业IDS在内做的不好的便是实时告警这一块,有的通过Web页面提供告警,做得好一点则能配置邮件告警。但在我实际的使用中发现,告警策略其实更与系统使用者密切联系,而开发厂商很难能提供周密完善的配置选项,所以一个能高度定制化的告警系统是我们想要的。
ElasticSearch 的 API 接口为我们提供了这种可能,其查询返回的 JSON 格式也非常便于我们进行查询数据的处理。官方提供的 X-PACK 扩展包中便包含了可以提供告警功能的 Watcher 扩展插件。
这里简单介绍下Watcher的配置方法:
PUT _xpack/watcher/watch/xxxx_name
{
"trigger" : { "schedule" : { "interval" : "10s" }},
"input" : {},
"condition" : {"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}},
"actions" : {}
}
指定一个告警任务,需要配置4个模块:Trigger、Input、Condition、ACTIONS
Trigger
Trigger控制如何触发一次查询,可以设置间隔时间
每隔10s触发
"trigger" : {
"schedule" : { "interval" : "10s" }
}
也支持大家熟悉的cron格式
每天9:05触发
"trigger" :{
"schedule" : {"cron" : "0 5 9 * * ?"}
}
Input
Input 负责数据的来源,支持以下四个input类型
simple: 读取静态内容
search: 读取查询语句执行后的返回结果
http: 读取HTTP请求的返回结果
chain: 用来自由组合使用上面三种类型组成的多个input
以search为例举个例子
查询“logs”索引,匹配message字段为error的数据
"input" : {
"search" : {
"request" : {
"indices" : [ "logs" ],
"body" : {
"query" : {
"match" : { "message": "error" }
}
}
}
}
}
Condition
condition决定之后的ACTION是否执行
当查询数量大于0时执行ACTION
"input" : {
"search" : {
"request" : {
"indices" : [ "logs" ],
"body" : {
"query" : {
"match" : { "message": "error" }
}
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
}
Action
当条件符合后,ACTION决定执行的操作即信息内容及发送的对象。
支持email, webhook, index, logging, hipchat, Slack, 和 pagerduty
发送POST到 xxx.com:123/path ,内容为json字串 {"a":"b","total":xxx}
"actions" : {
"xxx_webhook_name" : {
"webhook" : {
"method" : "POST",
"host" : "xxx.com",
"port" : 123,
"path": ":/path",
"body" : "{
"a": "b'",
"total": "{{ctx.payload.hits.total}}"
}"
}
}
}
利用webhook可以向任意Web服务发送请求,这是我们想要的。通过这个接口非常便于与企业内的其他已有平台进行交互,或者可以通过如今流行的IM公众号。
这里以蓝信为例,需要先请求获取一个token,然后发送JSON字串到API即可,其中JSON字串的内容为:
了解了基础的Watcher配置方法和蓝信接口后,我们来制作告警任务。拟一个目标:在每天9、14、18点查询当日日志数量,日志类型的统计,来源IP统计。
PUT _xpack/watcher/watch/attack_alert
{
"trigger" :{"schedule" : {"cron" : "0 0 9,14,18 * * ?"}},
"input": {
"chain": {
"inputs": [
##发送HTTP请求到蓝信获取token
{
"first": {
"http" :{
"request": {
"host": "lxopen.api.xxx.com","port": 80,
"path": "/ffff/token",
"params": {"grant_type":"client_credential","appid":"12345","secret":"abcde"}
}
}
}
},
##进行查询得到目标结果
{
"all": {
"search": {
"request": {
"indices": "<logstash-{now/d}>",
"body": {
"query": {
"bool": {
"must": {"match_all": {}}
}
},
"aggs" : {
"ip_list" : {
"terms" : { "field" : "src_ip.keyword" }
},
"type_list" : {
"terms" : { "field" : "event_type.keyword" }
}
}
}
}
}
}
}
]
}
},
##只要数量大于0就执行ACTION
"condition" :{
"compare" : {"ctx.payload.all.hits.total": {"gt": 0}}
},
"actions": {
"attack_alert" : {
"throttle_period" : "600s", //每两次告警间的最小时间间隔
"webhook" : {
"method" : "POST",
"headers": {"Content-type": "application/json"},
"host" : "lxopen.api.xxx.com",
"port" : 80,
"path" :"/xxxx/message/send",
"params" : {"access_token" : "{{ctx.payload.first.access_token}}"}, //传入前面获取到的 token
#构造我们的文本消息
"body" : "{"toall": "true","tousers": ["180******0"],"msgtype": "text","text": {"content": "=======Total {{ctx.payload.all.hits.total}}\n{{#ctx.payload.all.aggregations.type_list.buckets}}{{key}} {{doc_count}}\n{{/ctx.payload.all.aggregations.type_list.buckets}}\n=======Src IP\n{{#ctx.payload.all.aggregations.ip_list.buckets}}{{key}} {{doc_count}}\n{{/ctx.payload.all.aggregations.ip_list.buckets}}"}}"
}
}
}
}
创建成功后,蓝信客户端就可以每天定时收到当天的日志统计报告了。
只需要改下接口,也可以同时发送到其他如微信等平台上。
第二个例子,来个干货。
针对敏感操作的日志记录。对于敏感或者储存重要数据的服务器,如果发生了入侵事件我们可以通过命令记录来查看黑客入侵的方式,但有经验的入侵者往往会删去这些记录。为了防止这一点,我们有必要对命令历史记录做一个增强,让它能较完整的记录命令相关信息(时间、命令、登录用户、当前用户等),同时能实时将日志回传到es。
在/etc/bash.bashrc行末加入以下内容:
HISTDIR='/var/log/command.log'
if [ ! -f $HISTDIR ];then
touch $HISTDIR
chmod 666 $HISTDIR
fi
export HISTTIMEFORMAT="{"TIME":"%F %T","HOSTNAME":"$HOSTNAME","LI":"$(who -u am i 2>/dev/null| awk '{print $NF}'|sed -e 's/[()]//g')","LU":"$(who am i|awk '{print $1}')","NU":"${USER}","CMD":""
export PROMPT_COMMAND='history 1|tail -1|sed "s/^[ ]+[0-9]+ //"|sed "s/$/"}/">> /var/log/command.log'
执行 source /etc/bash.bashrc,就可以在/var/log/command.log看到日志
JSON类型的日志,通过logstash中配置日志回传:
input {
...
file {
path => "/var/log/command.json"
type => "cmd"
}
...
}
filter {
...
if [type] == "cmd" {
json{
source => "message"
}
}
...
}
output {
if [type] == "attack" or [type] == "cmd"{
elasticsearch {
hosts => ["localhost:9200"]
user => elastic
password => changeme
}
}
}
这样ES就会实时收集主机的shell日志了。
最后完善下告警任务,在Input里加入:
{
"cmd": {
"search": {
"request": {
"indices": "<logstash-{now/d}>",
"types" : "cmd",
"body": {
"query": {
"bool": {
"must": {"match_all": {}}
}
}
}
}
}
}
}
在告警内容里面添加:
\n=======CMD\n{{#ctx.payload.cmd.hits.hits}}{{HOSTNAME}} {{LI}}:{{CMD}}\n{{/ctx.payload.cmd.hits.hits}}
效果预览像这样:
一旦服务器上有操作产生,我们便能立马得到通知,进行应急响应。
最后
对于告警,我想说的是一定要记得:满篇的告警是不会有人去看的。重点过多也就没有重点了,告警一定要精而不要多,只针对一些关键的异常告警往往能事半功倍。
参考链接
蓝信开放平台 https://docs.lanxin.cn/
Getting Started with Watcher https://www.elastic.co/guide/en/x-pack/current/watcher-getting-started.html
Suricata IDS https://suricata-ids.org/
发表评论
您还未登录,请先登录。
登录