view paraspace/injection.py @ 113:ee13c86d84f2

Let user specify redirection map for methods
author Thinker K.F. Li <thinker@codemud.net>
date Tue, 02 Aug 2011 18:10:48 +0800
parents 650dcb9c01ee
children 867184e01852
line wrap: on
line source


def _relocatable_children(obj):
    from paraspace.dex_deptracker import _dex_tree_get_child
    from paraspace.dexfile import relocatable, array

    if isinstance(obj, array):
        rel_children = [('items.' + str(idx), value)
                        for idx, value in enumerate(obj.items)
                        if isinstance(value, relocatable)]
        return rel_children
    
    attrs = obj.children()
    attr_value_pairs = [(attr, _dex_tree_get_child(obj, attr))
                        for attr in attrs]
    rel_children = [(attr, value) for attr, value in attr_value_pairs
                    if isinstance(value, relocatable)]
    return rel_children


## \brief Travel relocatable descendants.
#
# \param cloner is the function to return a clone.
# \param adjuster is called to adjust the clone.
# \param visit_log is a dictionary to keep clones.
#
def _travel_desc_relocatable(obj, worker, visit_log):
    if id(obj) in visit_log:
        return visit_log[id(obj)]

    result = worker(obj)
    visit_log[id(obj)] = result
    
    rel_children = _relocatable_children(obj)
    for attr, value in rel_children:
        _travel_desc_relocatable(value, worker, visit_log)
        pass
    pass


## \brief Return name string of a linked class definition item.
def classdef_name(classdef):
    return classdef.classIdx.descriptorIdx.stringDataOff.data


## \brief Return a map that map type of a object to the list of a DEXFile.
def dex_type_2_array_attr_map():
    global dex_type_2_array_attr_map
    from paraspace.dexfile import DEXFile, array
    from paraspace.dex_deptracker import _marker

    def skip_marker_type(clazz):
        while isinstance(clazz, _marker):
            clazz = clazz.back_type
            pass
        return clazz
    
    attr_values = [(attr, skip_marker_type(getattr(DEXFile, attr)))
                   for attr in dir(DEXFile)]
    array_attrs = [(skip_marker_type(value.child_type), attr)
                   for attr, value in attr_values
                   if isinstance(value, array)]
    type_2_attr = dict(array_attrs)
    
    dex_type_2_array_attr_map = lambda: type_2_attr
    
    return type_2_attr


_saved_dex_type_2_array_attr_map = dex_type_2_array_attr_map


## \brief Append a object to appropriate list of a DEXFile object.
#
# Skip the object if found no appropriate list.
#
def dex_append_obj_list(dex, obj):
    from paraspace.dex_deptracker import _dex_tree_get_child
    from paraspace.dex_deptracker import _dex_tree_set_child

    type_2_attr = dex_type_2_array_attr_map()
    try:
        attr = type_2_attr[obj.__class__]
    except KeyError:
        return

    array = getattr(dex, attr)
    array.items.append(obj)
    
    count_name = array.count_name
    if count_name:
        count = _dex_tree_get_child(dex, count_name)
        _dex_tree_set_child(dex, count_name, count + 1)
        pass
    pass


## \brief Clone a composite object.
#
# \param dex is the DEXFile that the composite object is cloning for.
# \param comobj is composite object that is cloning.
#
def _clone_composite(dex, comobj):
    from copy import copy
    from paraspace.dexfile import _DEX_StringDataItem, _DEX_StringId
    from paraspace.dexfile import _DEX_TypeId
    from paraspace.dex_deptracker import _dex_tree_set_child
    from paraspace.dex_deptracker import _dex_tree_get_child
    
    visit_log = {}
    
    def cloner(obj):
        clone = copy(obj)
        return clone

    def relink_dependencies(clone):
        rel_children = _relocatable_children(clone)
        for attr, value in rel_children:
            clone_value = visit_log[id(value)]
            _dex_tree_set_child(clone, attr, clone_value)
            pass
        pass

    def merge_unique_strdata():
        strdatas = [(obj_id, obj)
                    for obj_id, obj in visit_log.items()
                    if isinstance(obj, _DEX_StringDataItem)]
        dex_str_2_strdata = dict([(strdata.data.data, strdata)
                                  for strdata in dex.stringDataItems.items])
        for obj_id, strdata in strdatas:
            if strdata.data.data in dex_str_2_strdata:
                visit_log[obj_id] = dex_str_2_strdata[strdata.data.data]
            else:
                dex_append_obj_list(dex, strdata)
                pass
            pass
        pass

    def merge_unique_strid():
        strids = [(obj_id, obj)
                  for obj_id, obj in visit_log.items()
                  if isinstance(obj, _DEX_StringId)]
        
        for obj_id, strid in strids:
            relink_dependencies(strid)
            pass

        strdata_2_strid = dict([(strid.stringDataOff, strid)
                                for strid in dex.stringIds.items])
        for obj_id, strid in strids:
            if strid.stringDataOff in strdata_2_strid:
                visit_log[obj_id] = strdata_2_strid[strid.stringDataOff]
            else:
                dex_append_obj_list(dex, strid)
                pass
            pass
        pass

    def merge_unique_typeid():
        typeids = [(obj_id, obj)
                   for obj_id, obj in visit_log.items()
                   if isinstance(obj, _DEX_TypeId)]
        
        for obj_id, typeid in typeids:
            relink_dependencies(typeid)
            pass

        strid_2_typeid = dict([(typeid.descriptorIdx, typeid)
                               for typeid in dex.typeIds.items])
        for obj_id, typeid in typeids:
            if typeid.descriptorIdx in strid_2_typeid:
                visit_log[obj_id] = strid_2_typeid[typeid.descriptorIdx]
            else:
                dex_append_obj_list(dex, typeid)
                pass
            pass
        pass

    _travel_desc_relocatable(comobj, cloner, visit_log)

    merge_unique_strdata()
    merge_unique_strid()
    merge_unique_typeid()
    
    for obj in visit_log.values():
        if isinstance(obj, (_DEX_StringDataItem,
                            _DEX_StringId,
                            _DEX_TypeId)):
            continue
        relink_dependencies(obj)
        dex_append_obj_list(dex, obj)
        pass
    
    clone = visit_log[id(comobj)]
    return clone


## \brief Clone a class definition item
#
# \param dex is the DEXFile that clazz is cloning for.
# \param clazz is the class definition item that is cloning.
# \return the cloning _DEX_ClassDef.
#
def _clone_classdef(dex, clazz):
    from paraspace.dexfile import DEXFile_linked, _DEX_ClassDef
    
    def has_classdef(clazz):
        classname = DEXFile_linked.get_classdef_name(clazz)
        try:
            dex.find_class_name(classname)
        except ValueError:
            return False
        return True

    assert isinstance(clazz, _DEX_ClassDef)

    if has_classdef(clazz):
        raise RuntimeError, \
            'clone a class \'%s\'that is already in the DEXFile' % \
            classdef_name(clazz)
    
    clone = _clone_composite(dex, clazz)
    return clone


## \brief Clone a class definition and insert into a DEXFile.
#
# This function clone a class definition from a linked DEXFile and
# insert it into another one.
#
# \param dex is a DEXFile_linked to insert the clone.
# \param class_def is a class definition going to be cloned.
#
def dexfile_insert_class(dex, classdef):
    clone = _clone_classdef(dex, classdef)
    return clone


## \brief Collect info of classes mentioned by the code of given class.
def _find_class_relative(dex, classdef):
    def classify_typeids_defined(dex, typeids):
        classdefs = []
        undef_typeids = []
        for typeid in typeids:
            try:
                classdef = dex.find_class_typeid(typeid)
            except ValueError:
                undef_typeids.append(typeid)
            else:
                classdefs.append(classdef)
                pass
            pass
        return classdefs, undef_typeids

    typeidxs = collect_typeidxs_mentioned_by_class(dex, classdef)
    typeids = [dex.find_typeid_idx(typeidx)
               for typeidx in typeidxs]
    
    classdefs, typeids = classify_typeids_defined(dex, typeids)
    
    return classdefs, typeids


def dexfile_insert_classdefs(dex_dst, dex_src, classdefs):
    for classdef in classdefs:
        dexfile_insert_class(dex_dst, classdef)
        pass
    pass


## \brief Clone and insert a _DEX_TypeId to another DEXFile_linked.
#
# \param dex_dst is a DEXFile_linked where the cloning one is inserted.
# \param dex_src is a DEXFile_linked where the cloned one is from.
# \param typeid is a _DEX_TypeId that is cloned.
# \return the cloning _DEX_TypeId.
#
def dexfile_insert_typeid(dex_dst, dex_src, typeid):
    from paraspace.dexfile import _DEX_TypeId, DEXFile_linked
    
    assert isinstance(typeid, _DEX_TypeId)
    
    cloning = _clone_composite(dex_dst, typeid)
    
    methodids = dex_src.find_methodids_typeid(dex_src, typeid)
    for methodid in methodids:
        _clone_composite(dex_dst, methodid)
        pass
    
    return cloning


## \brief Clone and insert a list of _DEX_TypeId objects to a DEXFile_linked.
def dexfile_insert_typeids(dex_dst, dex_src, typeids):
    for typeid in typeids:
        dexfile_insert_typeid(dex_dst, dex_src, typeid)
        pass
    pass


## \brief Collects relative type IDs and classes definition for given class.
def collect_classdef_relative(dex, classdef):
    rel_classdefs = set([classdef])
    rel_typeids = set()
    
    classdef_queue = [classdef]
    while classdef_queue:
        cur_classdef = classdef_queue.pop(0)
        
        classdefs, typeids = _find_class_relative(dex, classdef)
        rel_typeids.update(typeids)
        new_classdefs = list(set(classdefs) - rel_classdefs)
        classdef_queue = classdef_queue + new_classdefs
        rel_classdefs.update(new_classdefs)
        pass
    return rel_classdefs, rel_typeids


## \brief Clone and insert given and relative classes into another DEXFile.
#
# \param dex_dst is a DEXFile_linked where the class will be inserted.
# \param dex_src is a DEXFile_linked where the cloned class is from.
# \param classdef is a _DEX_ClassDef that will be cloned.
#
def dexfile_insert_class_relative(dex_dst, dex_src, classdef):
    from paraspace.dexfile import DEXFile_linked
    
    def classdef_not_in_dst(classdef):
        classname = DEXFile_linked.get_classdef_name(classdef)
        try:
            dex_dst.find_class_name(classname)
        except ValueError:
            return True
        return False

    def typeid_not_in_dst(typeid):
        typename = DEXFile_linked.get_typeid_name(typeid)
        try:
            dex_dst.find_typeid_name(typename)
        except ValueError:
            return True
        return False
    
    relative_classdefs, relative_typeids = \
        collect_classdef_relative(dex_src, classdef)
    
    inserting_classdefs = filter(classdef_not_in_dst, relative_classdefs)
    inserting_typeids = filter(typeid_not_in_dst, relative_typeids)
    
    dexfile_insert_classdefs(dex_dst, dex_src, inserting_classdefs)
    dexfile_insert_typeids(dex_dst, dex_src, inserting_typeids)

    classname = DEXFile_linked.get_classdef_name(classdef)
    cloning = dex_dst.find_class_name(classname)
    
    return cloning


## \brief Redirect types and methods for the code of given method.
def method_redirect_types(dex, method, types_redir, methods_redir):
    from paraspace.dalvik_opcodes import decode_insn_blk, all_opcodes
    from paraspace.dalvik_opcodes import encode_opcode_vectors
    from paraspace.dexfile import DEXFile_linked
    
    if not method.codeOffRef.is_true:
        return
    
    code = method.codeOffRef.value
    insns_blk = code.insns.data
    op_vectors = decode_insn_blk(insns_blk)

    def redirect(opcode, args):
        if opcode == all_opcodes.OP_NEW_INSTANCE:
            typeidx = args[1]
            if typeidx in types_redir:
                to_type = types_redir[typeidx]
                return opcode, (args[0], to_type)
            pass
        elif opcode == all_opcodes.OP_INVOKE_DIRECT:
            methodidx = args[2]
            if methodidx not in methods_redir:
                return opcode, args
            
            return opcode, (args[0], args[1], methods_redir[methodidx],
                            args[3], args[4], args[5], args[6])
        return opcode, args
    
    new_op_vectors = [redirect(opcode, args) for opcode, args in op_vectors]
    new_insns_blk = encode_opcode_vectors(new_op_vectors) 
    
    code.insns.data = new_insns_blk
    pass


## \brief Make a map for methods from source type ID to ones from desinate.
def make_redir_classes_methods_map(dex_src, typeid_src,
                                   dex_dst, typeid_dst):
    from paraspace.dexfile import DEXFile_linked
    
    methods_src = [(idx, methodid)
                   for idx, methodid in enumerate(dex_src.methodIds.items)
                   if methodid.classIdx == typeid_src]
    
    def make_map_methodid(methodid_src):
        name = DEXFile_linked.get_methodid_name(methodid_src)
        proto = methodid_src.protoIdx
        try:
            methodid_dst = \
                dex_dst.find_methodid_name_proto(name, proto, typeid_dst)
        except ValueError:
            return -1
        methodidx_dst = dex_dst.get_idx_methodid(methodid_dst)
        return methodidx_dst
    
    methods_map = [(methodidx_src, make_map_methodid(methodid_src))
                   for methodidx_src, methodid_src in methods_src]
    methods_map = [(methodidx_src, methodidx_dst)
                   for methodidx_src, methodidx_dst in methods_map
                   if methodidx_dst != -1]
    methods_map = dict(methods_map)
    return methods_map


## \brief Redirect types and methods mentioned in the code of a class.
#
# For code of given class definition, Every mentions of types and
# methods are rewrote to types and methods according types_redir and
# methods_redir respectively.
#
# \param dex is a DEXFile_linked.
# \param classdef is a class definition.
# \param types_redir is a map of types.
# \param methods_redir is a map of methods.
#
def class_redirect_types(dex, classdef, types_redir, methods_redir):
    if not classdef.classDataOffRef.is_true:
        return
    
    classdata = classdef.classDataOffRef.value
    for method in classdata.directMethods.items:
        method_redirect_types(dex, method, types_redir, methods_redir)
        pass
    for method in classdata.virtualMethods.items:
        method_redirect_types(dex, method, types_redir, methods_redir)
        pass
    pass


## \brief Make a map to map methods from source types to destinate types.
#
# This function create a map to map methods from source types to
# methods from destinate types in \ref types_redir.
#
# \param dex is a DEXFile_linked that owns source and destinate types.
# \param types_redir is a map of types for redirecting types.
# \return a map of method indices.
#
def make_methods_redir_for_types_redir(dex_src, dex_dst, types_redir):
    methods_map = {}
    for typeidx_src, typeidx_dst in types_redir.items():
        typeid_src = dex_src.find_typeid_idx(typeidx_src)
        typeid_dst = dex_dst.find_typeid_idx(typeidx_dst)
        class_methods_map =  make_redir_classes_methods_map(dex_src,
                                                            typeid_src,
                                                            dex_dst,
                                                            typeid_dst)
        methods_map.update(class_methods_map)
        pass
    return methods_map


## \biref Redirect types of all code in given DEXFile_linked.
def dexfile_redirect_types(dex, types_redir, methods_redir, excludes=set([])):
    for classdef in dex.classDefs.items:
        typeid = classdef.classIdx
        idx = dex.get_idx_typeid(typeid)
        if idx in excludes:
            continue
        class_redirect_types(dex, classdef, types_redir, methods_redir)
        pass
    pass


## \brief Collect all type indices mentioned in the code of given method.
#
# \param method is a \ref _DEX_Method.
# \return a list of type indices mentioned in the code.
#
def collect_typeidxs_in_method(dex, method):
    from paraspace.dexfile import _DEX_Method, DEXFile_linked
    from paraspace.dalvik_opcodes import decode_insn_blk, all_opcodes
    from itertools import chain
    
    assert isinstance(method, _DEX_Method)

    def get_typeidx_methodidx(methodidx):
        methodid = dex.find_methodid_idx(methodidx)
        method_typeid = methodid.classIdx
        method_typeidx = dex.get_idx_typeid(method_typeid)
        return method_typeidx

    def collect_types_in_op_vector(op_vector):
        code, args = op_vector
        
        if code == all_opcodes.OP_NEW_INSTANCE:
            return (args[1],)
        
        if code in (all_opcodes.OP_INVOKE_DIRECT,
                    all_opcodes.OP_INVOKE_VIRTUAL,
                    all_opcodes.OP_INVOKE_SUPER,
                    all_opcodes.OP_INVOKE_STATIC,
                    all_opcodes.OP_INVOKE_INTERFACE):
            methodidx = args[2]
            method_typeidx = get_typeidx_methodidx(methodidx)
            return (method_typeidx,)

        if code in (all_opcodes.OP_INVOKE_VIRTUAL_RANGE,
                    all_opcodes.OP_INVOKE_DIRECT_RANGE,
                    all_opcodes.OP_INVOKE_SUPER_RANGE,
                    all_opcodes.OP_INVOKE_STATIC_RANGE,
                    all_opcodes.OP_INVOKE_INTERFACE_RANGE):
            methodidx = args[1]
            method_typeidx = get_typeidx_methodidx(methodidx)
            return (method_typeidx,)
        
        return ()

    code_blk = DEXFile_linked.get_code_block_method(method)
    op_vectors = decode_insn_blk(code_blk)
    types_insns = [collect_types_in_op_vector(op_vector)
                   for op_vector in op_vectors]
    typeidxs = list(chain(*types_insns))
    
    return typeidxs


## \brief Collect all type indices mentioned by the code of given class.
def collect_typeidxs_mentioned_by_class(dex, classdef):
    from paraspace.dexfile import DEXFile_linked

    assert isinstance(dex, DEXFile_linked)
    
    typeidxs = set()
    methods = DEXFile_linked.get_methods_classdef(classdef)
    for method in methods:
        method_typeidxs = collect_typeidxs_in_method(dex, method)
        typeidxs.update(method_typeidxs)
        pass
    
    return list(typeidxs)