view paraspace/injection.py @ 109:835336632aba

Add collect_typeidxs_in_method()
author Thinker K.F. Li <thinker@codemud.net>
date Mon, 01 Aug 2011 14:37:04 +0800
parents 18be67af7f1e
children 6380730a80b4
line wrap: on
line source


def _relocatable_children(obj):
    from paraspace.dex_deptracker import _dex_tree_get_child
    from paraspace.dexfile import relocatable, array

    if isinstance(obj, array):
        rel_children = [('items.' + str(idx), value)
                        for idx, value in enumerate(obj.items)
                        if isinstance(value, relocatable)]
        return rel_children
    
    attrs = obj.children()
    attr_value_pairs = [(attr, _dex_tree_get_child(obj, attr))
                        for attr in attrs]
    rel_children = [(attr, value) for attr, value in attr_value_pairs
                    if isinstance(value, relocatable)]
    return rel_children


## \brief Travel relocatable descendants.
#
# \param cloner is the function to return a clone.
# \param adjuster is called to adjust the clone.
# \param visit_log is a dictionary to keep clones.
#
def _travel_desc_relocatable(obj, worker, visit_log):
    if id(obj) in visit_log:
        return visit_log[id(obj)]

    result = worker(obj)
    visit_log[id(obj)] = result
    
    rel_children = _relocatable_children(obj)
    for attr, value in rel_children:
        _travel_desc_relocatable(value, worker, visit_log)
        pass
    pass


## \brief Return name string of a linked class definition item.
def classdef_name(classdef):
    return classdef.classIdx.descriptorIdx.stringDataOff.data


## \brief Return a map that map type of a object to the list of a DEXFile.
def dex_type_2_array_attr_map():
    global dex_type_2_array_attr_map
    from paraspace.dexfile import DEXFile, array
    from paraspace.dex_deptracker import _marker

    def skip_marker_type(clazz):
        while isinstance(clazz, _marker):
            clazz = clazz.back_type
            pass
        return clazz
    
    attr_values = [(attr, skip_marker_type(getattr(DEXFile, attr)))
                   for attr in dir(DEXFile)]
    array_attrs = [(skip_marker_type(value.child_type), attr)
                   for attr, value in attr_values
                   if isinstance(value, array)]
    type_2_attr = dict(array_attrs)
    
    dex_type_2_array_attr_map = lambda: type_2_attr
    
    return type_2_attr


_saved_dex_type_2_array_attr_map = dex_type_2_array_attr_map


## \brief Append a object to appropriate list of a DEXFile object.
#
# Skip the object if found no appropriate list.
#
def dex_append_obj_list(dex, obj):
    from paraspace.dex_deptracker import _dex_tree_get_child
    from paraspace.dex_deptracker import _dex_tree_set_child

    type_2_attr = dex_type_2_array_attr_map()
    try:
        attr = type_2_attr[obj.__class__]
    except KeyError:
        return

    array = getattr(dex, attr)
    array.items.append(obj)
    
    count_name = array.count_name
    if count_name:
        count = _dex_tree_get_child(dex, count_name)
        _dex_tree_set_child(dex, count_name, count + 1)
        pass
    pass


## \brief Clone a class definition item
#
# \param dex is the DEXFile that clazz is cloning for.
# \param clazz is the class definition item that is cloning.
#
def _clone_classdef(dex, clazz):
    from copy import copy
    from paraspace.dexfile import _DEX_StringDataItem, _DEX_StringId
    from paraspace.dexfile import _DEX_TypeId
    from paraspace.dex_deptracker import _dex_tree_set_child
    from paraspace.dex_deptracker import _dex_tree_get_child
    
    visit_log = {}
    
    def cloner(obj):
        clone = copy(obj)
        return clone

    def relink_dependencies(clone):
        rel_children = _relocatable_children(clone)
        for attr, value in rel_children:
            clone_value = visit_log[id(value)]
            _dex_tree_set_child(clone, attr, clone_value)
            pass
        pass

    def merge_unique_strdata():
        strdatas = [(obj_id, obj)
                    for obj_id, obj in visit_log.items()
                    if isinstance(obj, _DEX_StringDataItem)]
        dex_str_2_strdata = dict([(strdata.data.data, strdata)
                                  for strdata in dex.stringDataItems.items])
        for obj_id, strdata in strdatas:
            if strdata.data.data in dex_str_2_strdata:
                visit_log[obj_id] = dex_str_2_strdata[strdata.data.data]
            else:
                dex_append_obj_list(dex, strdata)
                pass
            pass
        pass

    def merge_unique_strid():
        strids = [(obj_id, obj)
                  for obj_id, obj in visit_log.items()
                  if isinstance(obj, _DEX_StringId)]
        
        for obj_id, strid in strids:
            relink_dependencies(strid)
            pass

        strdata_2_strid = dict([(strid.stringDataOff, strid)
                                for strid in dex.stringIds.items])
        for obj_id, strid in strids:
            if strid.stringDataOff in strdata_2_strid:
                visit_log[obj_id] = strdata_2_strid[strid.stringDataOff]
            else:
                dex_append_obj_list(dex, strid)
                pass
            pass
        pass

    def merge_unique_typeid():
        typeids = [(obj_id, obj)
                   for obj_id, obj in visit_log.items()
                   if isinstance(obj, _DEX_TypeId)]
        
        for obj_id, typeid in typeids:
            relink_dependencies(typeid)
            pass

        strid_2_typeid = dict([(typeid.descriptorIdx, typeid)
                               for typeid in dex.typeIds.items])
        for obj_id, typeid in typeids:
            if typeid.descriptorIdx in strid_2_typeid:
                visit_log[obj_id] = strid_2_typeid[typeid.descriptorIdx]
            else:
                dex_append_obj_list(dex, typeid)
                pass
            pass
        pass

    def has_classdef(clazz):
        class_typeIds = set([classdef.classIdx
                             for classdef in dex.classDefs.items])
        return clazz.classIdx in class_typeIds

    _travel_desc_relocatable(clazz, cloner, visit_log)

    merge_unique_strdata()
    merge_unique_strid()
    merge_unique_typeid()
    
    for obj in visit_log.values():
        if isinstance(obj, (_DEX_StringDataItem,
                            _DEX_StringId,
                            _DEX_TypeId)):
            continue
        relink_dependencies(obj)
        dex_append_obj_list(dex, obj)
        pass
    
    if has_classdef(clazz):
        raise RuntimeError, \
            'clone a class \'%s\'that is already in the DEXFile' % \
            classdef_name(clazz)
    
    clone = visit_log[id(clazz)]
    return clone


## \brief Clone a class definition and insert into a DEXFile.
#
# This function clone a class definition from a linked DEXFile and
# insert it into another one.
#
# \param dex is a DEXFile_linked to insert the clone.
# \param class_def is a class definition going to be cloned.
#
def dexfile_insert_class(dex, class_def):
    clone = _clone_classdef(dex, class_def)
    return clone


## \brief Redirect types and methods for the code of given method.
def method_redirect_types(dex, method, types_redir, methods_redir):
    from paraspace.dalvik_opcodes import decode_insn_blk, all_opcodes
    from paraspace.dalvik_opcodes import encode_opcode_vectors
    from paraspace.dexfile import DEXFile_linked
    
    if not method.codeOffRef.is_true:
        return
    
    code = method.codeOffRef.value
    insns_blk = code.insns.data
    op_vectors = decode_insn_blk(insns_blk)

    def redirect(opcode, args):
        if opcode == all_opcodes.OP_NEW_INSTANCE:
            typeidx = args[1]
            if typeidx in types_redir:
                to_type = types_redir[typeidx]
                return opcode, (args[0], to_type)
            pass
        elif opcode == all_opcodes.OP_INVOKE_DIRECT:
            methodidx = args[2]
            if methodidx not in methods_redir:
                return opcode, args
            
            return opcode, (args[0], args[1], methods_redir[methodidx],
                            args[3], args[4], args[5], args[6])
            methodid = dex.find_methodid_idx(methodidx)
            method_typeid = methodid.classIdx
            method_typeidx = dex.get_idx_typeid(method_typeid)
            if method_typeidx not in types_redir:
                method_typeid = dex.find_typeid_idx(method_typeidx - 1)
                return opcode, args
            
            new_method_typeidx = types_redir[method_typeidx]
            new_method_typeid = dex.find_typeid_idx(new_method_typeidx)
            classdef = dex.find_class_typeid(new_method_typeid)
            method_name = DEXFile_linked.get_methodid_name(methodid)
            method_proto = methodid.protoIdx

            try:
                new_method = dex.find_method_name_proto(method_name,
                                                        method_proto,
                                                        classdef)
            except ValueError:
                return opcode, args
            new_method_idx = dex.get_idx_method(new_method)
            return opcode, (args[0], args[1], new_method_idx,
                            args[3], args[4], args[5], args[6])
        return opcode, args
    
    new_op_vectors = [redirect(opcode, args) for opcode, args in op_vectors]
    new_insns_blk = encode_opcode_vectors(new_op_vectors) 
    
    code.insns.data = new_insns_blk
    pass


## \brief Make a map for methods from source type ID to ones from desinate.
def make_redir_classes_methods_map(dex_src, typeid_src,
                                   dex_dst, typeid_dst):
    from paraspace.dexfile import DEXFile_linked
    
    methods_src = [(idx, methodid)
                   for idx, methodid in enumerate(dex_src.methodIds.items)
                   if methodid.classIdx == typeid_src]
    
    def make_map_methodid(methodid_src):
        name = DEXFile_linked.get_methodid_name(methodid_src)
        proto = methodid_src.protoIdx
        try:
            methodid_dst = \
                dex_dst.find_methodid_name_proto(name, proto, typeid_dst)
        except ValueError:
            return -1
        methodidx_dst = dex_dst.get_idx_methodid(methodid_dst)
        return methodidx_dst
    
    methods_map = [(methodidx_src, make_map_methodid(methodid_src))
                   for methodidx_src, methodid_src in methods_src]
    methods_map = [(methodidx_src, methodidx_dst)
                   for methodidx_src, methodidx_dst in methods_map
                   if methodidx_dst != -1]
    methods_map = dict(methods_map)
    return methods_map


## \brief Redirect types and methods mentioned in the code of a class.
#
# For code of given class definition, Every mentions of types and
# methods are rewrote to types and methods according types_redir and
# methods_redir respectively.
#
# \param dex is a DEXFile_linked.
# \param classdef is a class definition.
# \param types_redir is a map of types.
# \param methods_redir is a map of methods.
#
def class_redirect_types(dex, classdef, types_redir, methods_redir):
    if not classdef.classDataOffRef.is_true:
        return
    
    classdata = classdef.classDataOffRef.value
    for method in classdata.directMethods.items:
        method_redirect_types(dex, method, types_redir, methods_redir)
        pass
    for method in classdata.virtualMethods.items:
        method_redirect_types(dex, method, types_redir, methods_redir)
        pass
    pass


## \brief Make a map to map methods from source types to destinate types.
#
# This function create a map to map methods from source types to
# methods from destinate types in \ref types_redir.
#
# \param dex is a DEXFile_linked that owns source and destinate types.
# \param types_redir is a map of types for redirecting types.
# \return a map of method indices.
#
def _make_methods_redir_for_types_redir(dex, types_redir):
    methods_map = {}
    for typeidx_src, typeidx_dst in types_redir.items():
        typeid_src = dex.find_typeid_idx(typeidx_src)
        typeid_dst = dex.find_typeid_idx(typeidx_dst)
        class_methods_map =  make_redir_classes_methods_map(dex,
                                                            typeid_src,
                                                            dex,
                                                            typeid_dst)
        methods_map.update(class_methods_map)
        pass
    return methods_map


## \biref Redirect types of all code in given DEXFile_linked.
def dexfile_redirect_types(dex, types_redir, excludes=set([])):
    methods_redir = _make_methods_redir_for_types_redir(dex, types_redir)
    
    for classdef in dex.classDefs.items:
        typeid = classdef.classIdx
        idx = dex.get_idx_typeid(typeid)
        if idx in excludes:
            continue
        class_redirect_types(dex, classdef, types_redir, methods_redir)
        pass
    pass


## \brief Collect all type indices mentioned in the code of given method.
#
# \param method is a \ref _DEX_Method.
#
def collect_typeidxs_in_method(dex, method):
    from paraspace.dexfile import _DEX_Method, DEXFile_linked
    from paraspace.dalvik_opcodes import decode_insn_blk, all_opcodes
    from itertools import chain
    
    assert isinstance(method, _DEX_Method)

    def get_typeidx_methodidx(methodidx):
        methodid = dex.find_methodid_idx(methodidx)
        method_typeid = methodid.classIdx
        method_typeidx = dex.get_idx_typeid(method_typeid)
        return method_typeidx

    def collect_types_in_op_vector(op_vector):
        code, args = op_vector
        
        if code == all_opcodes.OP_NEW_INSTANCE:
            return (args[1],)
        
        if code in (all_opcodes.OP_INVOKE_DIRECT,
                    all_opcodes.OP_INVOKE_VIRTUAL,
                    all_opcodes.OP_INVOKE_SUPER,
                    all_opcodes.OP_INVOKE_STATIC,
                    all_opcodes.OP_INVOKE_INTERFACE):
            methodidx = args[2]
            method_typeidx = get_typeidx_methodidx(methodidx)
            return (method_typeidx,)

        if code in (all_opcodes.OP_INVOKE_VIRTUAL_RANGE,
                    all_opcodes.OP_INVOKE_DIRECT_RANGE,
                    all_opcodes.OP_INVOKE_SUPER_RANGE,
                    all_opcodes.OP_INVOKE_STATIC_RANGE,
                    all_opcodes.OP_INVOKE_INTERFACE_RANGE):
            methodidx = args[1]
            method_typeidx = get_typeidx_methodidx(methodidx)
            return (method_typeidx,)
        
        return ()

    code_blk = DEXFile_linked.get_code_block_method(method)
    op_vectors = decode_insn_blk(code_blk)
    types_insns = [collect_types_in_op_vector(op_vector)
                   for op_vector in op_vectors]
    typeidxs = list(chain(*types_insns))
    
    return typeidxs