【技术分享】基于Graylog日志安全审计实践

阅读量568219

|

发布时间 : 2017-06-26 16:11:29

https://p0.ssl.qhimg.com/t01b39448bae08e2e85.jpg

日志审计是安全运维中常见的工作, 而审计人员如何面对各个角落里纷至沓来的日志的数据,成了一个公通课题,日志集中收集是提高审计效率的第一步。

现在各在安全厂商都提供自己日志中心产品,并提供可视化监控和审计管理工具,各需求方企业也可以使用ELK这种开源工具定制自己的日志中心,像Splunk这种收费产品也广泛被人们所知,而我们今天要说的是一种集大成的开源日志数据管理解决方案:Graylog, 以及基于Graylog安全审计实践。

Graylog是集Kafka、MongoDB、ElasticSearch、Java Restful、WEB Dashboard为一体的日志数据中心管理解决方案,和需要定制化ELK解决方案相比,ELK能做Graylog基本都能做, 并且逐渐状大起来的Graylog社区,提供各种所需要插件工具来扩展Graylog的功能,来支持各种日志协议, 甚至可通过AlienVault OTX(Open Threat Exchange)提供的Graylog插件,访问OTX上开源威胁情报,Graylog为未来的安全日志管理提供了更多的可能性,在提供后台审计管理的Dashboard外, Graylog通过Restful服务,对所有收集来的数据提供对外REST API接口服务,可以为任何支持HTTP JSON访问的语言工具,提供数据查询接口。

关于Graylog工具本身更多的原理及使用说明,不在此处赘述,大家可以到Graylog官方网站自己查阅,此文重点说明,我们在实际项目用如何使用Graylog进行实时的安全日志审计实践的, 希望实际的应用案例,可以打开使用开源日志工具审计日志的思路,涉及进行数据收集、预警、审计、展现与其它设备协作等相关主题,篇幅有限,点到为止。

功能汇总

运行在内网的各种服务的日志数据,以不同的形式存在于不现的平台上,有Windows、Linux、路由器、IDS、WAF等厂商设备。大多数情况下,我们可通Agent和Syslog这种形式,让数据集中到我们日志中心服务上, 先进行数据持久化,然后进行数据审计分析利用。

http://p6.qhimg.com/t015d35901c3d00315d.png

日志数据背后,映射了某种行为,我们就是通过日志数据来观察存在那些异常的行为。我们通过Graylog进行数据收集并完成以下功能:

1-1. 分级存储

按照需求多级存储数据,保证数据安全,且查询高效。我们通过将日志集中收集到syslog日志服务中心,持久化存储日志数据, 将日志数据转发ElasticSearch集群,根据数据量需求的大小,调整集群规模,通过Graylog提供的后台查询系统进行分词查询。

1-2. 快速弹性扩展

多种日志收集方式,可部署在各种设备、服务器、系统之中,实现快速弹性扩展。Graylog提供了多种Agent部署工具,可以跨平台收集日志,通过Graylog自身插件的扩展,在服务器端接受各种协议数据。

1-3. 实时威胁报警

经过分析过滤之后发现的威胁行为会及时通过邮件发送给相关负责人,并提供REST API产生更多报警类型。REST API为扩展安全预警策略提供各种方便。

1-4. 直观高效的可视化展示

针对不同数据适时展现图表、柱状图、饼图、世界地图等。快速直观地显示实时数据。

http://p7.qhimg.com/t018b0e235d14a933b2.png

1-5. 自动化威胁检测

实时多级联动,快速分析异常行为,降低单个防护设备的漏报和误报率。 通过第三方设备接收,数据碰撞。

物理硬件部署

实践系统结构图:

http://p1.qhimg.com/t01f3264aae0ca01525.png

我们收集的日志,一部分是通过在系统上部署Agent来获取日志数据,比较典型的应用场景就是邮件日志审计,对于使用Exchange Server作为邮件服务器的企业来说,除了使用邮件网关,也可以通过在Windows Server上部署代理来取得IMAP、POP3、IIS、Windows Events等Windows平台的日志,进行服务器安全审计,邮件服务健康检查。

Graylog提供了 NxLog、FilleBeat、Sidercar等Agent服务,取得Windows系统上的日志数据。

2-1:部署Agent,进行邮件服务审计

2-1-1.Windows事件监控:

通过监控多台Windows服务器上EventLog,通过Graylog进行分词,对铭感关键字进行实时审计,对频繁登录失败、匿名登录、高权限操作、关键进程启动成功与失败进行监控,保证Windows Server的安全性与邮件服务的监控性。

2-1-2.Exchange日志审计:

通过Graylog的数据筛选预警功能, 对特定邮件账户实施安全监控,部署安全检查策略,一旦发现账户在异常时常、异常地区登录、爆破被锁等行为、进行实时预警通告。

2-1-3.邮件服务健康监控:

一般企业能都会有多台邮件服务在工作,通过负载均衡的方式,来分散用户请求多单台服务器的访问压力,而细致到对每台邮件服务器上的每种邮件协议(POP、IMAPI、Excahnge)报文监控, 可通过Graylog可视化统计,发现那台协议流量数据异常,无流量,流量过载和等业务级的诡异行为存在。

2-1-4. 邮件管理员审计:

对邮件服务器管理员的日志时行审计,涉及到敏感账号的操作进行预警。

2-2:通过Syslog传输,进行服务审计:

2-2-1.WEB服务日志审计:

企业内部的WEB服务日志,可以通过syslog的形式传送给Graylog, 典型的WEB服务就是nginx, 通过Graylog的GELF(Graylog Extended Log Format )进行日志快速分词,在GROK的基础上又丰富了不少,无需部署logstash在nginx服务器上,直接将日志通过syslog协议推送到Graylog提供的日志接口上。对nginx请求的状态进行统计、对URL中有无注入进行预警、对恶意访问也可及时发现访问异常特征。

2-2-2.VPN日志审计:

企业VPN为员工在非公司办公区访问公司内部资源提供了方便,如何挖掘和发现是公司内部人员正常操作以外的行为,是安全审计的关注点,某些用户ID产生不该产生的行为日志,这种行为发现与回溯,可通过日志graylog对VPN日志审计来做到。

用户通过登录VPN对内网进行描述,可以通过在Graylog上自定义策略辅助二次开发进行监控,某ID对某台机器进行扫描,端口号或是其它数据特征数据会变化明显,我们可通过自动监控数据变化,实时识别扫描行为。

2-2-3.Honeypot日志审计:

Honeypot是部署在内网伪应用服务器, 正常情况下,内部不会去访问Honeypot,一旦Honepot有流量产生,及大的可能是攻击行为出现。Graylog与Honeypot结合使用,可以及时感知威胁,并可视化攻击位置及相关paylog信息.

2-2-4.防火墙与IDS日志审计:

很多的IDS与防火墙都提供了syslog日志吐出功能,将防火墙日志与其它安全检查设备日志,进行对据对撞 ,可以进一步的验证威胁情报的有效性。

数据业务逻辑

实践系统业务逻辑图:

http://p8.qhimg.com/t01e5c70e787ac2589c.png

Graylog与ELK不同的是,在ElasticSearch提供的直接数据索引查询的基础之上,又抽像出一个新的Restfull服务层,通过在内部的Input、stream、pipeline这些抽象概念对具体的各种日志进行了分类,并提供一套REST API,对外提供数据查询、统计相关的API,通过这些API进行自动化审计加工。

REST API服务

Graylog虽然提供了REST API,但在实践中,我们发现Graylog没有直接提供开发SDK, 如果想把Stream、Input这些概念在我们的自动安全检查逻辑中隐藏起来,集中处理和业务相关自动化安全检查逻辑,就要实现SDK,而不是直接使用,暴露出来的REST API。

http://p1.qhimg.com/t01b1a036a890a1adfe.png

Graylog系统架构

4-1. REST API的SDK

我们实践的方案是通过nginx+lua服务器形式,实现用户REST API请求转发,通过自己实现的SDK开发了一套直接和内部业务数据直接相关的查询接口,返回VPN、WEB服务器、邮件Mail等日志数据。

下面是用Moonscript语言实现Graylog的Stream查询的SDK,Moonscript会被翻译成Lua被Nginx Moudle运行。

class GMoonSDK
    pwd: ""
    uname: ""
    headers_info: ""
    endpoints: {
        's_uat':{'/search/universal/absolute/terms':{'field', 'query', 'from', 'to', 'limit'} }
        's_ua':{'/search/universal/absolute':{'fields', 'query', 'from', 'to', 'limit'} }
        's_urt':{'/search/universal/relative/terms':{'field', 'query', 'range'} }
        's_ut':{'/search/universal/relative':{'fields', 'query', 'range'} }
    }
    @build_headers: =>
        auth = "Basic "..encode_base64(self.uname..":"..self.pwd)
        headers= {
            'Authorization': auth, 
            'Accept': 'application/json'
        }
        return headers 
    @auth: (username, password, host, port) =>
        --授权信息检查
        errList = {}
        if type(port) == 'nil'
            table.insert(errList, "port is niln")
        if type(host) == 'nil'
            table.insert(errList, "host is niln")
        if type(password) == 'nil'
            table.insert(errList, "password is niln")
        if type(username) == 'table'
            table.insert(errList, "username is niln")
        num = table.getn(errList) 
        if num > 0 
            return errList
       --设置授权信息
        self.uname = username
        self.pwd = password 
        self.host = host
        self.port = port
        self.url = "http://"..host..":"..port
        self.headers_info = selfbuild_headers()
        return self.url
    @getRequest:(req_url) =>
        body, status_code, headers = http.simple {
            url: req_url 
            method: "GET"
            headers: self.headers_info 
        }
        return body
    @checkParam:(s_type, s_param) =>
        --检查配置信息
        if type(self.url) == "nil"
            return 'auth info err.'
        --检查端末类型
        info = self.endpoints[s_type]
        chk_flg = type(info)
        if chk_flg == "nil"
            return "Input parameter error,unknow type."
        key = ''
        for k,v in pairs info
            key = k 
        --检查查询参数有效性
        str = ''
        for k,v in pairs info[key]
            if type(s_param[v]) == 'nil'
                return info[key][k]..":is nil"
            str = str..s_param[v]
        return "OK", str
    @call: (s_type, s_param) =>
        key = ''
        for k,v in pairs self.endpoints[s_type]
            key = k 
        --参数打包成URL
        url_data = ngx.encode_args(s_param)
        tmp_url = self.url..key.."?"
        req_url = tmp_url..url_data
        --转发用户HTTP请求给GraylogRest服务。
        ret = selfgetRequest req_url
        return ret
    @dealStream: (s_type, s_param) =>
        ret = ''
        status, param_list = GMoonSDKcheckParam s_type, s_param
        if status == "OK"
            ret = GMoonSDKcall s_type, s_param
        else 
            ret = status 
        return ret

SDK完成后,我们在Nginx+Lua上用Lapis创建一个WEB服务,做REST API数据请求转发。

class App extends lapis.Application
  "/testcase": =>
    --准备对应REST的输入参数,如果相应该有的项目没有设定会输出NG原因。
    param_data= {
        fields:'username',
        limit:3,
        query:'*',
        from: '2017-01-05 00:00:00',
        to:'2017-01-06 00:00:00',
        filter:'streams'..':'..'673b1666ca624a6231a460fa'
    }
    --进行鉴权信息设定
    url  = GMoonSDKauth 'supervisor', 'password', '127.0.0.1', '12600'
    
    --调用对应'TYPE'相对应的REST服务,返回结果。
    ret = GMoonSDKdealStream 's_ua', param_data
    ret

上文提到 ‘TYPE’, 其实就是对Endpoints的一种编号,基本上和GrayLog REST API是一对一关系。

endpoints: {
        's_uat':{'/search/universal/absolute/terms':{'field', 'query', 'from', 'to', 'limit'} }
        's_ua':{'/search/universal/absolute':{'fields', 'query', 'from', 'to', 'limit'} }
        's_urt':{'/search/universal/relative/terms':{'field', 'query', 'range'} }
        's_ut':{'/search/universal/relative':{'fields', 'query', 'range'} }
    }

理论上说,可以个修改以上的数据结构,对应各种REST API的服封装(GET),只要知道对应URL与可接收的参数列表。

4-2. Dashboard的Widget数据更新

http://p8.qhimg.com/t017e73bcf9efd83784.png

Graylog数据管理概念图

Graylog抽象出Input、Stream、Dashboard这些自己原生的日志管理概念,是基于对日志数据一种新的组织化分,我们通过Graylog中一个叫Dashboard方法,对某一类日志数据,进行Top10排序,

例如:对5分钟之内端口访问量最多的10个用户进行排序。

 rglog = require "GRestySDK"
  data = '{
   "description": "scan-port",
   "type": "QUICKVALUES",
   "config": {
     "timerange": {
       "type": "relative",
       "range": 123
     },
     "field": "port",
     "stream_id": "56e7ab11fd624ca91defeb11",
     "query": "username: graylog",
     "show_data_table": true,
     "show_pie_chart": true
   },
   "cache_time": 3000
 }
 '
  url  = rglogauth 'admin', 'password', '0.0.0.0', '12345'
  rglogupdateWidget('57a7bc60be624b691feab6f','019bca13-50cf-481e-a123-a0d2e649b41a',data)

Graylog在收到这个请求后,会数某日志数据,进行快速统计排序, 把Top10的统计数据,用饼图的形式画出来。

4-3.REST API定制新审计后台与可视化展示

如果你不想用Kibanna、Grayglog Dashboard,想实现自己的一套审计工具后台,或是情态感知的可视化大屏幕,可通过基于自己习某用开发的语言,开发一套SDK进行接功能扩展,实现自己定制的可视化感知系统。

https://p3.ssl.qhimg.com/t016016c198080b51c7.png

五. 反扫检查

威胁情报可视化,一直以来对安全人员分析安全事件起着有益的作用, 可视化是对分析的结果一种图形化的映射,是威胁行为的一种图形具象化。针对蜜罐日志分析的流程来讲,溯源和展示攻击行为本身也是很重要的,我们可以结合Graylog可视化与REST API自动检测,与honeypot和扫描器结果结合分析,发现来至内部的扫描形为。

蜜罐向类似mysql这种库中写入被访问的IP地址和Port,启动定时任 务读取数据库,取出数据库当条目总数,与之前本地保存的最大数进行比较 发现,数据库 中的日志记录变多了,就将这些数据取出,进行分析和报警。这是一种基于Mysql存储和蜜罐威胁事件结合的方式。

另一种方式是依赖ips,ids这种设备,对网段内的所有蜜罐的流量进行监控,发现有任何 触发蜜罐的访问就进行数据的报警分析,不好的地方是,除了要依赖这些设备 ,ids和ids 本身对蜜罐被访问策略控制比较单一,另外如果想进一步的想取得访问的payload也需要与ids ,ips再次交互取,不同产商的设备特点不统一。

所以产生了,第三种Graylog与Honeypot结合反扫检查的方案,通过Graylog提供RESTAPI,自动化统计日志数据,通过Graylog Dashboard功能统计端口访问量大的ID、IP,将Honepot日志与Graylog数据进行对撞分析,再与扫描器记录下每台服务器的开放的端口指纹做比较,如果访问了端口指纹里没有开放的端口,并且还有触发蜜罐的历史,可以加强定位为是威胁。

http://p8.qhimg.com/t0169dc182b6f287ec9.png

上图就是通过Graylog Dashboard返回的端口访问量前三的用户的可视化图。User1明显为描扫行为,User2是可以行为,User3是正常访问行为。

6.总结

我们将Graylog作为一个可扩展的数据容器来使用,因为Graylog REST的这个基础功能,让他从一个数据管理工具升级为数据平台。日志千变万化,行为迥异不同,Graylog只是众多日志数据管理产品中的一个,Graylog依靠开源免费表现优异的特点,越来越被人们接受。Graylog可能会在一次次产品升级过程中升级完善,也可能被更好的新产品夺走注意力,但是我们基于某种工具上实践思路是可以延续的,名词术语变了,应用模式依然有生命力,开放的思路,更益于工具的使用,借这篇向大家介绍Graylog一点实践应用思路。

本文转载自:

如若转载,请注明出处:

安全KER - 有思想的安全新媒体

分享到:微信
+10赞
收藏
新浪ssrc
分享到:微信

发表评论

Copyright © 北京奇虎科技有限公司 三六零数字安全科技集团有限公司 安全KER All Rights Reserved 京ICP备08010314号-66