view paraspace/injection.py @ 104:61cef1662035

Redirect types
author Thinker K.F. Li <thinker@codemud.net>
date Thu, 28 Jul 2011 00:06:54 +0800
parents 355986e5cfbd
children f14c32108164
line wrap: on
line source


def _relocatable_children(obj):
    from paraspace.dexfile import relocatable, array
    
    if isinstance(obj, array):
        if not obj.items:
            return []
        rel_children = [(repr(idx), elt)
                        for idx, elt in enumerate(obj.items)
                        if isinstance(elt, relocatable)]
        return rel_children
    
    attr_value_pairs = [(attr, getattr(obj, attr)) for attr in dir(obj)]
    rel_children = [(attr, value) for attr, value in attr_value_pairs
                    if isinstance(value, relocatable)]
    return rel_children


## \brief Travel relocatable descendants.
#
# \param cloner is the function to return a clone.
# \param adjuster is called to adjust the clone.
# \param visit_log is a dictionary to keep clones.
#
def _travel_desc_relocatable(obj, worker, visit_log):
    if id(obj) in visit_log:
        return visit_log[id(obj)]

    result = worker(obj)
    visit_log[id(obj)] = result
    
    rel_children = _relocatable_children(obj)
    for attr, value in rel_children:
        _travel_desc_relocatable(value, worker, visit_log)
        pass
    pass


## \brief Return name string of a linked class definition item.
def classdef_name(classdef):
    return classdef.classIdx.descriptorIdx.stringDataOff.data


## \brief Return a map that map type of a object to the list of a DEXFile.
def dex_type_2_array_attr_map():
    global dex_type_2_array_attr_map
    from paraspace.dexfile import DEXFile, array
    from paraspace.dex_deptracker import _marker

    def skip_marker_type(clazz):
        while isinstance(clazz, _marker):
            clazz = clazz.back_type
            pass
        return clazz
    
    attr_values = [(attr, skip_marker_type(getattr(DEXFile, attr)))
                   for attr in dir(DEXFile)]
    array_attrs = [(skip_marker_type(value.child_type), attr)
                   for attr, value in attr_values
                   if isinstance(value, array)]
    type_2_attr = dict(array_attrs)
    
    dex_type_2_array_attr_map = lambda: type_2_attr
    
    return type_2_attr


## \brief Append a object to appropriate list of a DEXFile object.
#
# Skip the object if found no appropriate list.
#
def dex_append_obj_list(dex, obj):
    from paraspace.dex_deptracker import _dex_tree_get_child
    from paraspace.dex_deptracker import _dex_tree_set_child

    type_2_attr = dex_type_2_array_attr_map()
    try:
        attr = type_2_attr[obj.__class__]
    except KeyError:
        return

    array = getattr(dex, attr)
    array.items.append(obj)
    
    count_name = array.count_name
    if count_name:
        count = _dex_tree_get_child(dex, count_name)
        _dex_tree_set_child(dex, count_name, count + 1)
        pass
    pass


## \brief Clone a class definition item
#
# \param dex is the DEXFile that clazz is cloning for.
# \param clazz is the class definition item that is cloning.
#
def _clone_classdef(dex, clazz):
    from copy import copy
    from paraspace.dexfile import _DEX_StringDataItem, _DEX_StringId
    from paraspace.dexfile import _DEX_TypeId
    
    visit_log = {}
    
    def cloner(obj):
        clone = copy(obj)
        return clone

    def relink_dependencies(clone):
        rel_children = _relocatable_children(clone)
        for attr, value in rel_children:
            clone_value = visit_log[id(value)]
            setattr(clone, attr, clone_value)
            pass
        pass

    def merge_unique_strdata():
        strdatas = [(obj_id, obj)
                    for obj_id, obj in visit_log.items()
                    if isinstance(obj, _DEX_StringDataItem)]
        dex_str_2_strdata = dict([(strdata.data.data, strdata)
                                  for strdata in dex.stringDataItems.items])
        for obj_id, strdata in strdatas:
            if strdata.data.data in dex_str_2_strdata:
                visit_log[obj_id] = dex_str_2_strdata[strdata.data.data]
            else:
                dex_append_obj_list(dex, strdata)
                pass
            pass
        pass

    def merge_unique_strid():
        strids = [(obj_id, obj)
                  for obj_id, obj in visit_log.items()
                  if isinstance(obj, _DEX_StringId)]
        
        for obj_id, strid in strids:
            relink_dependencies(strid)
            pass

        strdata_2_strid = dict([(strid.stringDataOff, strid)
                                for strid in dex.stringIds.items])
        for obj_id, strid in strids:
            if strid.stringDataOff in strdata_2_strid:
                visit_log[obj_id] = strdata_2_strid[strid.stringDataOff]
            else:
                dex_append_obj_list(dex, strid)
                pass
            pass
        pass

    def merge_unique_typeid():
        typeids = [(obj_id, obj)
                   for obj_id, obj in visit_log.items()
                   if isinstance(obj, _DEX_TypeId)]
        
        for obj_id, typeid in typeids:
            relink_dependencies(typeid)
            pass

        strid_2_typeid = dict([(typeid.descriptorIdx, typeid)
                               for typeid in dex.typeIds.items])
        for obj_id, typeid in typeids:
            if typeid.descriptorIdx in strid_2_typeid:
                visit_log[obj_id] = strid_2_typeid[typeid.descriptorIdx]
            else:
                dex_append_obj_list(dex, typeid)
                pass
            pass
        pass

    def has_classdef(clazz):
        class_typeIds = set([classdef.classIdx
                             for classdef in dex.classDefs.items])
        return clazz.classIdx in class_typeIds

    _travel_desc_relocatable(clazz, cloner, visit_log)

    merge_unique_strdata()
    merge_unique_strid()
    merge_unique_typeid()
    
    for obj in visit_log.values():
        if isinstance(obj, (_DEX_StringDataItem,
                            _DEX_StringId,
                            _DEX_TypeId)):
            continue
        relink_dependencies(obj)
        dex_append_obj_list(dex, obj)
        pass
    
    if has_classdef(clazz):
        raise RuntimeError, \
            'clone a class \'%s\'that is already in the DEXFile' % \
            classdef_name(clazz)
    
    clone = visit_log[id(clazz)]
    return clone


## \brief Clone a class definition and insert into a DEXFile.
#
# This function clone a class definition from a linked DEXFile and
# insert it into another one.
#
# \param dex is a DEXFile_linked to insert the clone.
# \param class_def is a class definition going to be cloned.
#
def dexfile_insert_class(dex, class_def):
    clone = _clone_classdef(dex, class_def)
    return clone


def method_redirect_types(dex, method, redirect_map):
    from paraspace.dalvik_opcodes import decode_insn_blk, all_opcodes
    from paraspace.dalvik_opcodes import encode_opcode_vectors
    from paraspace.dexfile import DEXFile_linked
    
    if not method.codeOffRef.is_true:
        return
    
    code = method.codeOffRef.value
    insns_blk = code.insns.data
    op_vectors = decode_insn_blk(insns_blk)

    def redirect(opcode, args):
        if opcode == all_opcodes.OP_NEW_INSTANCE:
            typeidx = args[1]
            if typeidx in redirect_map:
                to_type = redirect_map[typeidx]
                return opcode, (args[0], to_type)
            pass
        elif opcode == all_opcodes.OP_INVOKE_DIRECT:
            methodidx = args[2]
            methodid = dex.find_methodid_idx(methodidx)
            method_typeid = methodid.classIdx
            method_typeidx = dex.get_idx_typeid(method_typeidx)
            if method_typeidx not in redirect_map:
                return opcode, args
            
            new_method_typeidx = redirect_map[method_typeidx]
            new_method_typeid = dex.find_typeid_idx(new_method_typeidx)
            classdef = dex.find_class_typeid(new_method_typeid)
            method_name = DEXFile_linked.get_methodid_name(methodid)
            method_proto = methodid.protoIdx

            try:
                new_method = dex.find_method_name_proto(method_name,
                                                        method_proto,
                                                        classdef)
            except:
                return opcode, args
            new_method_idx = dex.get_index_method(new_method)
            return opcode, (args[0], args[1], new_method_idx,
                            args[3], args[4], args[5], args[6])
        return opcode, args
    
    new_op_vectors = [redirect(opcode, args) for opcode, args in op_vectors]
    new_insns_blk = encode_opcode_vectors(new_op_vectors) 
    
    code.insns.data = new_insns_blk
    pass


def class_redirect_types(dex, classdef, redirect_map):
    if not classdef.classDataOffRef.is_true:
        return
    
    classdata = classdef.classDataOffRef.value
    for method in classdata.directMethods.items:
        method_redirect_types(dex, method, redirect_map)
        pass
    for method in classdata.virtualMethods.items:
        method_redirect_types(dex, method, redirect_map)
        pass
    pass


def dexfile_redirect_types(dex, redirect_map, excludes=set([])):
    for classdef in dex.classDefs.items:
        typeid = classdef.classIdx
        idx = dex.get_index_typeid(typeid)
        if idx in excludes:
            continue
        class_redirect_types(dex, classdef, redirect_map)
        pass
    pass