view version.py @ 343:394b4e849c1b

added test for ds.subset
author Frederic Bastien <bastienf@iro.umontreal.ca>
date Tue, 17 Jun 2008 14:45:22 -0400
parents 7a734dba4cac
children
line wrap: on
line source

import subprocess as _subprocess
import imp as _imp
import sys
import os


_cache = dict()

def src_version(module_name):
    """Return compact identifier of module code.

    @return: compact identifier of module code.
    @rtype: string

    @note: This function tries to establish that the source files and the repo
    are syncronized.  It raises an Exception if there are un-tracked '.py'
    files, or if there are un-committed modifications.  This implementation uses
    "hg id" to establish this.  The code returned by "hg id" is not affected by
    hg pull, but pulling might remove the " tip" string which might have
    appeared.  This implementation ignores the  " tip" information, and only
    uses the code.

    @note: This implementation is assumes that the import directory is under
    version control by mercurial.

    """

    if module_name not in _cache:

        try :
            location = _imp.find_module(module_name)[1]
        except ImportError:
            _cache[module_name] = None
            return None
        #print 'location:', location
        isdir = False
        if os.path.isdir(location) :
            isdir = True
        elif os.path.isfile(location) :
            isdir = False
        else :
            # SEEMS THIS CASE EXIST, FOR WEIRD BUILTIN FUNCTIONS
            #print location,": it's 'not a dir, it's not a file, it's superman!"
            #raise Exception('Unknown location or file type')
            _cache[module_name] = None
            return None


        # we're dealing with a dir
        if isdir :

            # under hg?
            if not os.path.exists( os.path.join( location , '.hg') ) :
                _cache[module_name] = None
                return None

            status = _subprocess.Popen(('hg','st'),cwd=location,stdout=_subprocess.PIPE).communicate()[0]
            #print 'status =', status
            #TODO: check that the process return code is 0 (ticket #45)

            #status_codes = [line[0] for line in  if line and line[0] != '?']
            for line in status.split('\n'):
                if not line: continue
                if line[0] != '?':
                    raise Exception('Uncommitted modification to "%s" in %s (%s)'
                        %(line[2:], __name__,location))
                if line[0] == '?' and line[-3:] == '.py':
                    raise Exception('Untracked file "%s" in %s (%s)'
                        %(line[2:], __name__, location))

            hg_id = _subprocess.Popen(('hg','id'),cwd=location,stdout=_subprocess.PIPE).communicate()[0]

            # This asserts my understanding of hg id return values
            # There is mention in the doc that it might return two parent hash codes
            # but I've never seen it, and I dont' know what it means or how it is
            # formatted.
            tokens = hg_id.split(' ')
            assert len(tokens) <= 2
            assert len(tokens) >= 1
            assert tokens[0][-1] != '+' # the trailing + indicates uncommitted changes
            if len(tokens) == 2:
                assert tokens[1] == 'tip\n'

            _cache[module_name] = tokens[0]

        # we're dealing with a file
        if not isdir :

            folder = os.path.split( os.path.abspath(location) )[0]
            # under hg?
            if not os.path.exists( os.path.join( folder , '.hg') ) :
                _cache[module_name] = None
                return None

            status = _subprocess.Popen(('hg','st',location),cwd=folder,stdout=_subprocess.PIPE).communicate()[0]
            #print 'status =', status

            #status_codes = [line[0] for line in  if line and line[0] != '?']
            for line in status.split('\n'):
                if not line: continue
                if line[0] != '?':
                    raise Exception('Uncommitted modification to "%s" in %s (%s)'
                        %(line[2:], location,folder))
                if line[0] == '?' and line[-3:] == '.py':
                    raise Exception('Untracked file "%s" in %s (%s)'
                        %(line[2:], location, folder))

            hg_id = _subprocess.Popen(('hg','id'),cwd=folder,stdout=_subprocess.PIPE).communicate()[0]

            # This asserts my understanding of hg id return values
            # There is mention in the doc that it might return two parent hash codes
            # but I've never seen it, and I dont' know what it means or how it is
            # formatted.
            tokens = hg_id.split(' ')
            assert len(tokens) <= 2
            assert len(tokens) >= 1
            if tokens[0][-1] == '+' :
                tokens[0] = tokens[0][:-1] # the change was not on this file
            if len(tokens) == 2:
                assert tokens[1] == 'tip\n'

            _cache[module_name] = tokens[0]


    return _cache[module_name]

_unknown_version = 'unknown version'

def hg_version(dirname, filenames=None):
    """Return current changeset of directory I{dirname}.

    @type filename: list of str (or default: None)
    @param filename: if specified, we ignore modifications to other files.

    @rtype: tuple (last changeset, modified)

    """
    if type(filenames) not in (list, tuple, type(None)):
        raise TypeError(filenames) 

    #may raise exception, for example if hg is not visible via PATH
    status_proc = _subprocess.Popen(('hg','st'), cwd=dirname, 
            stdout=_subprocess.PIPE, stderr=_subprocess.PIPE)
    status = status_proc.communicate()[0] #read stdout into buffer
    if status_proc.returncode != 0:
        raise OSError('hg returned %i, maybe %s is not under hg control?',
                (status_proc.returncode, dirname))

    #may raise exception, for example if hg is not visible via PATH
    id_proc = _subprocess.Popen(('hg','id', '-i'), cwd=dirname,
            stdout=_subprocess.PIPE, stderr=_subprocess.PIPE)
    id_stdout = id_proc.communicate()[0]
    if id_proc.returncode != 0:
        raise OSError('hg returned %i, maybe %s is not under hg control?', 
                (id_proc.returncode, dirname))

    care_about = (lambda some_file : True) if filenames is None \
            else (lambda some_file : some_file in filenames)

    # parse status codes for what we care about
    care_about_mod = False
    for line in status.split('\n'):
        if not line:  #empty lines happen
            continue
        line_file = line[2:]
        if line[0] != '?' and care_about(line_file): 
            care_about_mod = True
            #raise Exception('Uncommitted modification', 
                    #os.path.join(dirname, line_file))
        if line[0] == '?' and line[-3:] == '.py':
            print >> sys.stderr, 'WARNING: untracked file', os.path.join(dirname, line_file)

    # id_stdout is 12 hex digits followed by '+\n' or '\n'
    # return the trailing '+' character only if there were changes to files that
    # the caller cares about (named in filenames)
    modified = (id_stdout[12] == '+')
    assert len(id_stdout) in (13, 14) #sanity check
    if modified and care_about_mod :
        return id_stdout[:13]
    else:
        return id_stdout[:12]

def _import_id_py_source(location):
    try:
        dirname = os.path.dirname(location[1])
        basename = os.path.basename(location[1])
        return hg_version(dirname, [basename])
    except OSError, e:
        print >> sys.stderr, 'IGNORNING', e
        return _unknown_version + ' PY_SOURCE'

def _import_id_py_compiled(location):
    #a .pyc file was found, but no corresponding .py
    return _unknown_version + ' PYC_COMPILED'

def _import_id_pkg_directory(location):
    try:
        return hg_version(location[1])
    except OSError, e:
        print >> sys.stderr, 'IGNORNING', e
        return _unknown_version + ' PKG_DIRECTORY'

def _import_id(tag):
    try :
        location = _imp.find_module(tag)
    except ImportError, e: #raise when tag is not found
        return e #put this in the cache, import_id will raise it

    #the find_module was successful, location is valid
    resource_type = location[2][2]

    if resource_type == _imp.PY_SOURCE:
        return _import_id_py_source(location)
    if resource_type == _imp.PY_COMPILED:
        return _import_id_py_compiled(location)
    if resource_type == _imp.C_EXTENSION:
        raise NoteImplementedError
    if resource_type == _imp.PY_RESOURCE:
        raise NoteImplementedError
    if resource_type == _imp.PKG_DIRECTORY:
        return _import_id_pkg_directory(location)
    if resource_type == _imp.C_BUILTIN:
        raise NoteImplementedError
    if resource_type == _imp.PY_FROZEN:
        raise NoteImplementedError

    assert False #the list of resource types above should be exhaustive

def import_id(tag):
    """Return an identifier of the code imported by 'import <tag>'.

    @param tag: a module or file name
    @type tag: string

    @rtype: string
    @return: identifier of the code imported by 'import <tag>'.

    This high-level function might do different things depending on, for
    example, whether I{tag} identifies a file or a directory, or whether the
    named entity is under some sort of version/revision control.

    Versions are sought in the following order:
    0. If I{tag} is 'python' then sys.version will be returned
    1. If I{tag} names a file or folder under revision control, this function
    will attempt to guess which one, and return a string that identifies the
    running code (a revision id, not the whole file!)
    2.  If I{tag} names a module with a __version__ attribute, then that
    attribute will be returned as a string.
    3. The string starting with 'unknown version' will be returned for other valid modules.
    4. An exception will be raise for non-existent modules.

    @note: This function may import the named entity in order to return a
    __version__ module attribute.

    """
    if tag not in import_id.cache:
        import_id.cache[tag] = _import_id(tag)

    #in the case of bad module names, we cached the ImportError exception
    rval = import_id.cache[tag]
    if isinstance(rval, Exception):
        raise rval
    return rval
import_id.cache = {'python':sys.version}

def get_all_src_versions() :
    """
    Get the version of all loaded module.
    Calls src_version on all loaded modules. These modules are found
    using sys.modules.

    Returns a dictionnary: name->version.
    
    @RETURN dict Dictionnary (module's name) -> (version)
    @SEE src_version
    """
    allmodules = sys.modules
    d = dict()
    for m in allmodules :
        try:
            d[m] = import_id(m)
        except:
            pass
    return d


if __name__ == "__main__" :

    if len(sys.argv) == 2 :
        print 'testing on', sys.argv[1]
        print import_id(sys.argv[1])