使用Cutter和Radare2对APT32恶意程序流程图进行反混淆处理

阅读量811541

|评论4

|

发布时间 : 2019-05-14 11:00:17

x
译文声明

本文是翻译文章,文章原作者 checkpoint,文章来源:research.checkpoint.com

原文地址:https://research.checkpoint.com/deobfuscating-apt32-flow-graphs-with-cutter-and-radare2/

译文仅供参考,具体内容表达以及含义原文为准。

 

OceanLotus(海莲花),也称APT32,攻击目标主要是东亚国家。研究表明该组织一直在持续更新后门、基础设施和感染单元。海莲花攻击的主要目标是东亚国家的企业和政府组织。

APT32的工具集广泛而多样。 它包含高级和简单组件,是手工工具和商业或开源工具的混合物,如Mimikatz和Cobalt Strike。 它通过dropper、shellcode执行恶意代码 ,通过诱饵文档和后门远程投递。 其中许多工具都经过高度混淆和调整,并采用不同的技术进行扩充,使其难以进行逆向分析。

在本文中,我们详细分析海莲花工具中一种代码混淆技术, 并展示如何编写一个简单的脚本绕过这种技术。

deobfuscation插件需要用到Cutter ,开源逆向工具radare2 的官方GUI版本 。

 

下载并安装Cutter

Cutter适用于所有平台(Linux,OS X,Windows)。 您可以在https://github.com/radareorg/cutter/releases下载最新版本。 如果您使用的是Linux,获取Cutter最快方法是使用AppImage文件。

如果您想使用可用的最新版本,新功能和错误修复,您可以从源代码构建Cutter。具体教程参考https://cutter.re/docs/building.html。

img

图1:Cutter界面

 

后门分析

首先,我们先分析一下后门。 相关样本( 486be6b1ec73d98fdd3999abe2fa04368933a2ec )是多阶段感染链的一部分,最近发现其在广泛应用。 所有这些阶段感染链具有Ocean Lotus的典型特征,其中该感染链来源于恶意文档( 115f3cb5bdfb2ffe5168ecb36b9aed54 )。 该文件声称源自中国安全厂商奇虎360,并包含恶意VBA宏代码,该代码将恶意shellcode注入rundll32.exe. shellcode包含解密例程,用于解密并将DLL反射加载至内存。 DLL即为后门。

首先,后门解密从文件资源中提取的配置文件。 配置文件存储命令和控制服务器等信息。 然后,二进制文件尝试使用定制的PE加载程序将辅助DLL加载到内存中,该 DLL名为HTTPProv.dll ,能够与C2服务器通信。 后门可以从命令和控制服务器接收许多不同的命令,包括shellcode执行、新进程的创建、文件和目录的操作等等。

Ocean Lotus使用了许多混淆技术,以使其工具更难以进行逆向分析。 最值得注意的是,Ocean Lotus在其二进制文件中使用了大量的垃圾代码。 垃圾代码使得样本更大更复杂,这分散了研究人员试图解析二进制文件的注意力。 反编译器对这些混淆的函数反编译经常失败,因为这些程序集经常使用堆栈指针,反编译器无法来处理这种病态代码。

 

混淆机制分析

在分析后门时,可以立即注意到一种混淆技术。将垃圾块插入到函数流中来实现控制流混淆, 这些垃圾块只是无意义的噪音,使函数功能变得混乱。

img

图2:垃圾块示例

如上图所示,块中充满了垃圾代码,这与函数的实际功能无关。 最好忽略这些块,但这说起来容易做起来难。 仔细看看这些街区将揭示一些有趣的东西。 这些垃圾块始终通过前一个块的条件跳转进行失败跳转。 此外,这些垃圾块总是以条件跳转结束,且与前一个块的条件跳转相反。 例如,如果垃圾块上方的条件是jo <some_addr> ,则垃圾块很可能以jno <some_addr>结束。 如果上面的块以jne <another_addr>结束,那么垃圾块将以 je <another_addr> 结束。

img

图3:相反的条件跳转

考虑到这一点,我们可以开始构建这些垃圾块的特征。 混淆的第一个特征是出现两个连续的块,这些块以相反的条件跳转结束到同一目标地址 。 另一个特性要求第二个块不包含有意义的指令,如字符串引用或调用

当满足这两个特性时,我们可以很有可能说第二个块是垃圾块。 在这种情况下,我们希望第一个块跳过垃圾块,以便从图中删除垃圾块。 这可以通过使用无条件跳转(也称为简单的JMP指令)修补条件跳转来完成。

img

图4:修改条件跳转到JMP指令将忽略垃圾块

 

编写插件

下面介绍的插件是为Cutter编写的,且与radare2脚本兼容。 这意味着我们将通过r2pipe使用一些巧妙的radare2命令 – 一个Python包装器与radare2交互。 这是编写radare2脚本最有效,最灵活的方法。

让插件同时支持Cutter和radare2并非易事,因为一个是GUI程序,另一个是CLI。 这意味着GUI对象在radare2中将毫无意义。 幸运的是,Cutter支持r2pipe,并且能够从其Python插件中执行radare2命令。

 

编写核心类

我们要做的第一件事是创建一个Python类,它将成为我们的核心类。 该类将包含用于查找和删除垃圾块的逻辑。 让我们从定义其__init__函数开始。 该函数将接收一个管道,该管道将是来自import r2piper2pipe 对象(import r2pipe )或来自Cutter的cutter对象 (import cutter )。

class GraphDeobfuscator:
   def __init__(self, pipe):
       """an initialization function for the class

       Arguments:
           pipe {r2pipe} -- an instance of r2pipe or Cutter's wrapper
       """

       self.pipe = pipe

现在我们可以使用这个管道执行radare2命令。 管道对象包含两种执行r2命令的主要方法。 第一个是pipe.cmd(<command>) ,它将以字符串形式返回命令的结果,第二个是pipe.cmdj(<command>j) ,它将从radare2命令的输出返回一个已解析的JSON对象。

注意: 几乎每个radare2命令都可以附加一个j来获得输出为JSON。

接下来我们要做的是获取当前函数的所有块,然后进行迭代。 我们可以通过使用afbj命令来执行此操作,即Analyze Function Blocks (分析函数块),结果以Json格式返回。

def clean_junk_blocks(self):
       """Search a given function for junk blocks, remove them and fix the flow.
       """
       # Get all the basic blocks of the function
       blocks = self.pipe.cmdj("afbj @ $F")
       if not blocks:
           print("[X] No blocks found. Is it a function?")
           return
       modified = False

       # Iterate over all the basic blocks of the function
       for block in blocks:
           # do something

对于每个块,我们想要知道在不发生条件跳转的情况下是否存在失败的块。 如果包含失败的块,则第二块是作为垃圾块的初始候选。

 def get_fail_block(self, block):
       """Return the block to which a block branches if the condition is fails

       Arguments:
           block {block_context} -- A JSON representation of a block

       Returns:
           block_context -- The block to which the branch fails. If not exists, returns None
       """
       # Get the address of the "fail" branch
       fail_addr = self.get_fail(block)
       if not fail_addr:
           return None
       # Get a block context of the fail address
       fail_block = self.get_block(fail_addr)
       return fail_block if fail_block else None

注意: 由于篇幅有限,不会解释此处出现的所有功能。上面代码片段中使用的get_block (addr)或get_fail_addr (block)函数是我们为使代码更清晰而编写的子例程。函数实现将在最终插件中提供,该插件在本文末尾显示和链接。

接下来,我们想检查我们的垃圾块候选是否在块之后立即出现。 如果不是,这很可能不是垃圾块,因为根据我们检查的情况,垃圾块位于具有条件跳转的块之后的代码中。

def is_successive_fail(self, block_A, block_B):
       """Check if the end address of block_A is the start of block_B

       Arguments:
           block_A {block_context} -- A JSON object to represent the first block
           block_B {block_context} -- A JSON object to represent the second block

       Returns:
           bool -- True if block_B comes immediately after block_A, False otherwise
       """

      return ((block_A["addr"] + block_A["size"]) == block_B["addr"])

然后,我们想要检查块候选是否包含无意义的指令。 例如,垃圾块不太可能包含CALL指令或字符串引用。 为此,我们将使用命令pdsb,即P rint D isassembly S ummary of a B lock(打印代码块反汇编汇总信息)。 我们假设垃圾块不包含有意义的指令。

def contains_meaningful_instructions (self, block):
       '''Check if a block contains meaningful instructions (references, calls, strings,...)

       Arguments:
           block {block_context} -- A JSON object which represents a block

       Returns:
           bool -- True if the block contains meaningful instructions, False otherwise
       '''

       # Get summary of block - strings, calls, references
       summary = self.pipe.cmd("pdsb @ {addr}".format(addr=block["addr"]))
       return summary != ""

最后,我们想检查两个块的条件跳转是否相反。为此,我们需要创建一个相反的条件跳转列表。 x86架构包含许多条件跳转指令,下面仅展示列表的部分内容。 也就是说,从我们的测试中,下面的列表足以覆盖APT32后门中呈现的所有不同对的条件跳转。 如果没有,则很容易添加附加说明。

 jmp_pairs = [
       ['jno', 'jo'],
       ['jnp', 'jp'],
       ['jb',  'jnb'],
       ['jl',  'jnl'],
       ['je',  'jne'],
       ['jns', 'js'],
       ['jnz', 'jz'],
       ['jc',  'jnc'],
       ['ja', 'jbe'],
       ['jae', 'jb'],
       ['je',  'jnz'],
       ['jg',  'jle'],
       ['jge', 'jl'],
       ['jpe', 'jpo'],
       ['jne', 'jz']]

   def is_opposite_conditional(self, cond_A, cond_B):
       """Check if two operands are opposite conditional jump operands

       Arguments:
           cond_A {string} -- the conditional jump operand of the first block
           cond_B {string} -- the conditional jump operand of the second block

       Returns:
           bool -- True if the operands are opposite, False otherwise
       """

       sorted_pair = sorted([cond_A, cond_B])
       for pair in self.jmp_pairs:
           if sorted_pair == pair:
               return True
       return False

现在我们定义了验证函数,我们可以将这些部分附加在我们之前创建的clean_junk_blocks()函数中。

 def clean_junk_blocks(self):
       """Search a given function for junk blocks, remove them and fix the flow.
       """

       # Get all the basic blocks of the function
       blocks = self.pipe.cmdj("afbj @ $F")
       if not blocks:
           print("[X] No blocks found. Is it a function?")
           return
       modified = False

       # Iterate over all the basic blocks of the function
       for block in blocks:
           fail_block = self.get_fail_block(block)
           if not fail_block or 
           not self.is_successive_fail(block, fail_block) or 
           self.contains_meaningful_instructions(fail_block) or 
           not self.is_opposite_conditional(self.get_last_mnem_of_block(block), self.get_last_mnem_of_block(fail_block)):
               continue

如果所有检查都成功通过,则我们很可能发现了一个垃圾块。下一步我们将要修补条件跳转指令为$JUMP$ 指令以跳过垃圾块,从而将垃圾块从图中移除,也即从函数体中移除。

为此,我们使用两个radare2命令。 第一个是aoj @ <addr> ,即A nalyze O pcode,它将为我们提供给定地址中指令的信息。 此命令可用于获取条件跳转的目标地址。 我们使用的第二个命令是wai <instruction> @ <addr> ,它代表W rite A ssembly I nside (写入汇编指令)。 与另一条覆盖指令的命令wa <instruction> @ <addr>不同, wai命令将使用NOP指令填充剩余的字节。 因此,在我们想要使用的JMP <addr>指令比当前条件跳转指令短的情况下,剩余的字节将被替换为NOP

 def overwrite_instruction(self, addr):
       """Overwrite a conditional jump to an address, with a JMP to it

       Arguments:
           addr {addr} -- address of an instruction to be overwritten
       """

       jump_destination = self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0])
       if (jump_destination):
           self.pipe.cmd("wai jmp 0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr))

在覆盖条件跳转指令之后,我们继续遍历函数的所有块并重复上述步骤。 最后,如果在函数中进行了更改,我们将重新分析函数,以便我们所做的更改显示在函数图中。

 def reanalize_function(self):
       """Re-Analyze a function at a given address

       Arguments:
           addr {addr} -- an address of a function to be re-analyze
       """
       # Seek to the function's start
       self.pipe.cmd("s $F")
       # Undefine the function in this address
       self.pipe.cmd("af- $")

       # Define and analyze a function in this address
       self.pipe.cmd("afr @ $")

最后, clean_junk_blocks()函数现在可以使用了。 我们现在还可以创建一个函数clean_graph() ,它可以清除后门的混淆函数。

 def clean_junk_blocks(self):
       """Search a given function for junk blocks, remove them and fix the flow.
       """

       # Get all the basic blocks of the function
       blocks = self.pipe.cmdj("afbj @ $F")
       if not blocks:
           print("[X] No blocks found. Is it a function?")
           return
       # Have we modified any instruction in the function?
       # If so, a reanalyze of the function is required
       modified = False

       # Iterate over all the basic blocks of the function
       for block in blocks:
           fail_block = self.get_fail_block(block)
           # Make validation checks
           if not fail_block or 
           not self.is_successive_fail(block, fail_block) or 
           self.contains_meaningful_instructions(fail_block) or 
           not self.is_opposite_conditional(self.get_last_mnem_of_block(block), self.get_last_mnem_of_block(fail_block)):
               continue
           self.overwrite_instruction(self.get_block_end(block))
           modified = True
       if modified:
           self.reanalize_function()


   def clean_graph(self):
       """the initial function of the class. Responsible to enable cache and start the cleaning
       """

       # Enable cache writing mode. changes will only take place in the session and
       # will not override the binary
       self.pipe.cmd("e io.cache=true")
       self.clean_junk_blocks()

核心类到此结束。

Cutter 还是Radare2?

如前所述,我们的代码将作为Cutter的插件执行,或者直接作为Python脚本从radare2 CLI执行。 这意味着我们需要有一种方法来了解我们的代码是从Cutter还是从radare2执行的。 为此,我们可以使用以下简单技巧。

# Check if we're running from cutter
try:
   import cutter
   from PySide2.QtWidgets import QAction
   pipe = cutter
   cutter_available = True
# If no, assume running from radare2
except:
   import r2pipe
   pipe = r2pipe.open()
   cutter_available = False

上面的代码检查是否可以导入cutter库。 如果可以,我们从Cutter内部运行,可以安全地做一些GUI操作。 否则,我们从radare2内部运行,因此我们选择导入r2pipe 。 在这两个语句中,我们分配了一个名为pipe的变量,该变量稍后将传递给我们创建的GraphDeobfuscator类。

从Radare2运行

这是使用此插件的最简单方法。 检查__name__等于“main”是一种常见的Python习惯用法,用于检查脚本是直接运行还是导入。 如果直接运行此脚本,我们只需执行clean_graph()函数。

if __name__ == "__main__":
   graph_deobfuscator = GraphDeobfuscator(pipe)
   graph_deobfuscator.clean_graph()

从Cutter运行

首先,我们需要确保我们从Cutter内部运行。 我们已经创建了一个名为cutter_variable的布尔变量。 我们只需要检查此变量是否设置为True 。 如果是,我们继续定义我们的插件类。

if cutter_available:
   # This part will be executed only if Cutter is available.
   # This will create the cutter plugin and UI objects for the plugin
   class GraphDeobfuscatorCutter(cutter.CutterPlugin):
       name = "APT32 Graph Deobfuscator"
       description = "Graph Deobfuscator for APT32 Samples"
       version = "1.0"
       author = "Itay Cohen (@Megabeets_)"

       def setupPlugin(self):
           pass

       def setupInterface(self, main):
           pass

   def create_cutter_plugin():
       return GraphDeobfuscatorCutter()

这是Cutter插件的框架- 它根本不包含任何适当的功能。 Cutter在加载时调用create_cutter_plugin()函数。 此时,如果我们将脚本放在Cutter的插件目录中,Cutter会将我们的文件识别为插件。

为了使插件执行我们的功能,我们需要添加一个菜单条目,用户可以按下该条目来触发我们的反混淆器。 我们选择将菜单条目或操作添加到“ Windows – >插件 ”菜单中。

if cutter_available:
   # This part will be executed only if Cutter is available. This will
   # create the cutter plugin and UI objects for the plugin
   class GraphDeobfuscatorCutter(cutter.CutterPlugin):
       name = "APT32 Graph Deobfuscator"
       description = "Graph Deobfuscator for APT32 Samples"
       version = "1.0"
       author = "Megabeets"

       def setupPlugin(self):
           pass

       def setupInterface(self, main):
           # Create a new action (menu item)
           action = QAction("APT32 Graph Deobfuscator", main)
           action.setCheckable(False)
           # Connect the action to a function - cleaner.
           # A click on this action will trigger the function
           action.triggered.connect(self.cleaner)

           # Add the action to the "Windows -> Plugins" menu
           pluginsMenu = main.getMenuByType(main.MenuType.Plugins)
           pluginsMenu.addAction(action)

       def cleaner(self):
           graph_deobfuscator = GraphDeobfuscator(pipe)
           graph_deobfuscator.clean_graph()
           cutter.refresh()


   def create_cutter_plugin():
       return GraphDeobfuscatorCutter()

该脚本现已准备就绪,可以放在Cutter插件目录下的Python文件夹中。 目录的路径显示在“ 编辑 – >首选项 – >插件 ”下的“插件选项”中。 例如,在我们的机器上,路径是:“ 〜/ .local / share / RadareOrg / Cutter / Plugins / Python ”。

现在,在打开Cutter时,我们可以在“ 插件 – >首选项 ”中看到该插件确实已加载。

img

图5:插件已成功加载

我们还可以查看“ Windows – >插件 ”菜单,看看我们创建的菜单项是否存在。 事实上,我们可以看到“APT32 Graph Deobfuscator”项目现在出现在菜单中。

img

图6:我们创建的菜单项已成功添加

我们现在可以选择一些我们怀疑包含垃圾块的函数,并尝试测试我们的插件。 在这个例子中,我们选择了函数fcn.00acc7e0 。 转到Cutter中的功能可以通过从左侧菜单中选择,或者只需按“g”并在导航栏中键入其名称或地址即可。

确保您在图表视图中,并随意四处浏览,试图发现垃圾块。 我们在下图中突出显示了它们,其中显示了Graph Overview(迷你图)窗口。

img

图7: fcn.00acc7e0 突出显示的垃圾块

当遇到候选可疑函数,我们可以触发我们的插件并查看它是否成功删除它们。 为此,请单击“ Windows – >插件 – > APT32图形反混淆器 ”。 一秒钟后,我们可以看到我们的插件成功删除了垃圾块。

img

图8:删除垃圾块后的相同功能

在下图中,您可以在删除垃圾块前后看到更多对函数。

img

图9: fcn.00aa07b0之前和之后

img

图10: fcn.00a8a1a0之前和之后

 

最后的话

Ocean Lotus的混淆技术绝不是最复杂或最难以击败的。 在本文中,我们了解了问题,起草了一个解决方案,最后使用Cutter和Radare2的python脚本功能实现了它。 完整的脚本可以在GitHub上找到,也可以附在本文的底部。

如果您有兴趣阅读有关Ocean Lotus的更多信息,我们推荐ESET的Romain Dumont发布的这篇 。 它包含对Ocean Lotus工具的全面分析,以及对所涉及的混淆技术的一些阐述。

 

附录

示例程序SHA-256值

  • Be6d5973452248cb18949711645990b6a56e7442dc30cc48a607a2afe7d8ec66
  • 8d74d544396b57e6faa4f8fdf96a1a5e30b196d56c15f7cf05767a406708a6b2APT32函数图反混淆器 – 完整代码
""" A plugin for Cutter and Radare2 to deobfuscate APT32 flow graphs
This is a python plugin for Cutter that is compatible as an r2pipe script for
radare2 as well. The plugin will help reverse engineers to deobfuscate and remove
junk blocks from APT32 (Ocean Lotus) samples.
"""

__author__  = "Itay Cohen, aka @megabeets_"
__company__ = "Check Point Software Technologies Ltd"

# Check if we're running from cutter
try:
    import cutter
    from PySide2.QtWidgets import QAction
    pipe = cutter
    cutter_available = True
# If no, assume running from radare2
except:
    import r2pipe
    pipe = r2pipe.open()
    cutter_available = False


class GraphDeobfuscator:
    # A list of pairs of opposite conditional jumps
    jmp_pairs = [
        ['jno', 'jo'],
        ['jnp', 'jp'],
        ['jb',    'jnb'],
        ['jl',    'jnl'],
        ['je',    'jne'],
        ['jns', 'js'],
        ['jnz', 'jz'],
        ['jc',    'jnc'],
        ['ja', 'jbe'],
        ['jae', 'jb'],
        ['je',    'jnz'],
        ['jg',  'jle'],
        ['jge', 'jl'],
        ['jpe', 'jpo'],
        ['jne', 'jz']]

    def __init__(self, pipe, verbose=False):
        """an initialization function for the class

        Arguments:
            pipe {r2pipe} -- an instance of r2pipe or Cutter's wrapper

        Keyword Arguments:
            verbose {bool} -- if True will print logs to the screen (default: {False})
        """

        self.pipe = pipe

        self.verbose = verbose

    def is_successive_fail(self, block_A, block_B):
        """Check if the end address of block_A is the start of block_B

        Arguments:
            block_A {block_context} -- A JSON object to represent the first block
            block_B {block_context} -- A JSON object to represent the second block

        Returns:
            bool -- True if block_B comes immediately after block_A, False otherwise
        """

        return ((block_A["addr"] + block_A["size"]) == block_B["addr"])

    def is_opposite_conditional(self, cond_A, cond_B):
        """Check if two operands are opposite conditional jump operands

        Arguments:
            cond_A {string} -- the conditional jump operand of the first block
            cond_B {string} -- the conditional jump operand of the second block

        Returns:
            bool -- True if the operands are opposite, False otherwise
        """

        sorted_pair = sorted([cond_A, cond_B])
        for pair in self.jmp_pairs:
            if sorted_pair == pair:
                return True
        return False

    def contains_meaningful_instructions (self, block):
        '''Check if a block contains meaningful instructions (references, calls, strings,...)

        Arguments:
            block {block_context} -- A JSON object which represents a block

        Returns:
            bool -- True if the block contains meaningful instructions, False otherwise
        '''

        # Get summary of block - strings, calls, references
        summary = self.pipe.cmd("pdsb @ {addr}".format(addr=block["addr"]))
        return summary != ""

    def get_block_end(self, block):
        """Get the address of the last instruction in a given block

        Arguments:
            block {block_context} -- A JSON object which represents a block

        Returns:
            The address of the last instruction in the block
        """

        # save current seek
        self.pipe.cmd("s {addr}".format(addr=block['addr']))
        # This will return the address of a block's last instruction
        block_end = self.pipe.cmd("?v $ @B:-1")
        return block_end

    def get_last_mnem_of_block(self, block):
        """Get the mnemonic of the last instruction in a block

        Arguments:
            block {block_context} -- A JSON object which represents a block

        Returns:
            string -- the mnemonic of the last instruction in the given block
        """

        inst_info = self.pipe.cmdj("aoj @ {addr}".format(addr=self.get_block_end(block)))[0]
        return inst_info["mnemonic"]

    def get_jump(self, block):
        """Get the address to which a block jumps

        Arguments:
            block {block_context} -- A JSON object which represents a block

        Returns:
            addr -- the address to which the block jumps to. If such address doesn't exist, returns False 
        """

        return block["jump"] if "jump" in block else None

    def get_fail_addr(self, block):
        """Get the address to which a block fails

        Arguments:
            block {block_context} -- A JSON object which represents a block

        Returns:
            addr -- the address to which the block fail-branches to. If such address doesn't exist, returns False 
        """
        return block["fail"] if "fail" in block else None

    def get_block(self, addr):
        """Get the block context in a given address

        Arguments:
            addr {addr} -- An address in a block

        Returns:
            block_context -- the block to which the address belongs
        """

        block = self.pipe.cmdj("abj. @ {offset}".format(offset=addr))
        return block[0] if block else None

    def get_fail_block(self, block):
        """Return the block to which a block branches if the condition is fails

        Arguments:
            block {block_context} -- A JSON representation of a block

        Returns:
            block_context -- The block to which the branch fails. If not exists, returns None
        """
        # Get the address of the "fail" branch
        fail_addr = self.get_fail_addr(block)
        if not fail_addr:
            return None
        # Get a block context of the fail address
        fail_block = self.get_block(fail_addr)
        return fail_block if fail_block else None

    def reanalize_function(self):
        """Re-Analyze a function at a given address

        Arguments:
            addr {addr} -- an address of a function to be re-analyze
        """
        # Seek to the function's start
        self.pipe.cmd("s $F")
        # Undefine the function in this address
        self.pipe.cmd("af- $")

        # Define and analyze a function in this address
        self.pipe.cmd("afr @ $")       

    def overwrite_instruction(self, addr):
        """Overwrite a conditional jump to an address, with a JMP to it

        Arguments:
            addr {addr} -- address of an instruction to be overwritten
        """

        jump_destination = self.get_jump(self.pipe.cmdj("aoj @ {addr}".format(addr=addr))[0])
        if (jump_destination):
            self.pipe.cmd("wai jmp 0x{dest:x} @ {addr}".format(dest=jump_destination, addr=addr))

    def get_current_function(self):
        """Return the start address of the current function

        Return Value:
            The address of the current function. None if no function found.
        """
        function_start = int(self.pipe.cmd("?vi $FB"))
        return function_start if function_start != 0 else None

    def clean_junk_blocks(self):
        """Search a given function for junk blocks, remove them and fix the flow.
        """

        # Get all the basic blocks of the function
        blocks = self.pipe.cmdj("afbj @ $F")
        if not blocks:
            print("[X] No blocks found. Is it a function?")
            return
        # Have we modified any instruction in the function?
        # If so, a reanalyze of the function is required
        modified = False

        # Iterate over all the basic blocks of the function
        for block in blocks:
            fail_block = self.get_fail_block(block)
            # Make validation checks
            if not fail_block or 
            not self.is_successive_fail(block, fail_block) or 
            self.contains_meaningful_instructions(fail_block) or 
            not self.is_opposite_conditional(self.get_last_mnem_of_block(block), self.get_last_mnem_of_block(fail_block)):
                continue
            if self.verbose:
                print ("Potential junk: 0x{junk_block:x} (0x{fix_block:x})".format(junk_block=fail_block["addr"], fix_block=block["addr"]))
            self.overwrite_instruction(self.get_block_end(block))
            modified = True
        if modified:
            self.reanalize_function()

    def clean_graph(self):
        """the initial function of the class. Responsible to enable cache and start the cleaning
        """

        # Enable cache writing mode. changes will only take place in the session and
        # will not override the binary
        self.pipe.cmd("e io.cache=true")
        self.clean_junk_blocks()


if cutter_available:
    # This part will be executed only if Cutter is available. This will
    # create the cutter plugin and UI objects for the plugin
    class GraphDeobfuscatorCutter(cutter.CutterPlugin):
        name = "APT32 Graph Deobfuscator"
        description = "Graph Deobfuscator for APT32 Samples"
        version = "1.0"
        author = "Itay Cohen (@Megabeets_)"

        def setupPlugin(self):
            pass

        def setupInterface(self, main):
            # Create a new action (menu item)
            action = QAction("APT32 Graph Deobfuscator", main)
            action.setCheckable(False)
            # Connect the action to a function - cleaner.
            # A click on this action will trigger the function
            action.triggered.connect(self.cleaner)

            # Add the action to the "Windows -> Plugins" menu
            pluginsMenu = main.getMenuByType(main.MenuType.Plugins)
            pluginsMenu.addAction(action)

        def cleaner(self):
            graph_deobfuscator = GraphDeobfuscator(pipe)
            graph_deobfuscator.clean_graph()
            cutter.refresh()


    def create_cutter_plugin():
        return GraphDeobfuscatorCutter()


if __name__ == "__main__":
    graph_deobfuscator = GraphDeobfuscator(pipe)
    graph_deobfuscator.clean_graph()
本文翻译自research.checkpoint.com 原文链接。如若转载请注明出处。
分享到:微信
+13赞
收藏
CryptoPentest
分享到:微信

发表评论

内容需知
合作单位
  • 安全客
  • 安全客
Copyright © 北京奇虎科技有限公司 三六零数字安全科技集团有限公司 安全客 All Rights Reserved 京ICP备08010314号-66