Mercurial > pylearn
view version.py @ 454:6e7509acb1c0
Merged
author | delallea@valhalla.apstat.com |
---|---|
date | Thu, 02 Oct 2008 13:41:43 -0400 |
parents | 7a734dba4cac |
children |
line wrap: on
line source
import subprocess as _subprocess import imp as _imp import sys import os _cache = dict() def src_version(module_name): """Return compact identifier of module code. @return: compact identifier of module code. @rtype: string @note: This function tries to establish that the source files and the repo are syncronized. It raises an Exception if there are un-tracked '.py' files, or if there are un-committed modifications. This implementation uses "hg id" to establish this. The code returned by "hg id" is not affected by hg pull, but pulling might remove the " tip" string which might have appeared. This implementation ignores the " tip" information, and only uses the code. @note: This implementation is assumes that the import directory is under version control by mercurial. """ if module_name not in _cache: try : location = _imp.find_module(module_name)[1] except ImportError: _cache[module_name] = None return None #print 'location:', location isdir = False if os.path.isdir(location) : isdir = True elif os.path.isfile(location) : isdir = False else : # SEEMS THIS CASE EXIST, FOR WEIRD BUILTIN FUNCTIONS #print location,": it's 'not a dir, it's not a file, it's superman!" #raise Exception('Unknown location or file type') _cache[module_name] = None return None # we're dealing with a dir if isdir : # under hg? if not os.path.exists( os.path.join( location , '.hg') ) : _cache[module_name] = None return None status = _subprocess.Popen(('hg','st'),cwd=location,stdout=_subprocess.PIPE).communicate()[0] #print 'status =', status #TODO: check that the process return code is 0 (ticket #45) #status_codes = [line[0] for line in if line and line[0] != '?'] for line in status.split('\n'): if not line: continue if line[0] != '?': raise Exception('Uncommitted modification to "%s" in %s (%s)' %(line[2:], __name__,location)) if line[0] == '?' and line[-3:] == '.py': raise Exception('Untracked file "%s" in %s (%s)' %(line[2:], __name__, location)) hg_id = _subprocess.Popen(('hg','id'),cwd=location,stdout=_subprocess.PIPE).communicate()[0] # This asserts my understanding of hg id return values # There is mention in the doc that it might return two parent hash codes # but I've never seen it, and I dont' know what it means or how it is # formatted. tokens = hg_id.split(' ') assert len(tokens) <= 2 assert len(tokens) >= 1 assert tokens[0][-1] != '+' # the trailing + indicates uncommitted changes if len(tokens) == 2: assert tokens[1] == 'tip\n' _cache[module_name] = tokens[0] # we're dealing with a file if not isdir : folder = os.path.split( os.path.abspath(location) )[0] # under hg? if not os.path.exists( os.path.join( folder , '.hg') ) : _cache[module_name] = None return None status = _subprocess.Popen(('hg','st',location),cwd=folder,stdout=_subprocess.PIPE).communicate()[0] #print 'status =', status #status_codes = [line[0] for line in if line and line[0] != '?'] for line in status.split('\n'): if not line: continue if line[0] != '?': raise Exception('Uncommitted modification to "%s" in %s (%s)' %(line[2:], location,folder)) if line[0] == '?' and line[-3:] == '.py': raise Exception('Untracked file "%s" in %s (%s)' %(line[2:], location, folder)) hg_id = _subprocess.Popen(('hg','id'),cwd=folder,stdout=_subprocess.PIPE).communicate()[0] # This asserts my understanding of hg id return values # There is mention in the doc that it might return two parent hash codes # but I've never seen it, and I dont' know what it means or how it is # formatted. tokens = hg_id.split(' ') assert len(tokens) <= 2 assert len(tokens) >= 1 if tokens[0][-1] == '+' : tokens[0] = tokens[0][:-1] # the change was not on this file if len(tokens) == 2: assert tokens[1] == 'tip\n' _cache[module_name] = tokens[0] return _cache[module_name] _unknown_version = 'unknown version' def hg_version(dirname, filenames=None): """Return current changeset of directory I{dirname}. @type filename: list of str (or default: None) @param filename: if specified, we ignore modifications to other files. @rtype: tuple (last changeset, modified) """ if type(filenames) not in (list, tuple, type(None)): raise TypeError(filenames) #may raise exception, for example if hg is not visible via PATH status_proc = _subprocess.Popen(('hg','st'), cwd=dirname, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE) status = status_proc.communicate()[0] #read stdout into buffer if status_proc.returncode != 0: raise OSError('hg returned %i, maybe %s is not under hg control?', (status_proc.returncode, dirname)) #may raise exception, for example if hg is not visible via PATH id_proc = _subprocess.Popen(('hg','id', '-i'), cwd=dirname, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE) id_stdout = id_proc.communicate()[0] if id_proc.returncode != 0: raise OSError('hg returned %i, maybe %s is not under hg control?', (id_proc.returncode, dirname)) care_about = (lambda some_file : True) if filenames is None \ else (lambda some_file : some_file in filenames) # parse status codes for what we care about care_about_mod = False for line in status.split('\n'): if not line: #empty lines happen continue line_file = line[2:] if line[0] != '?' and care_about(line_file): care_about_mod = True #raise Exception('Uncommitted modification', #os.path.join(dirname, line_file)) if line[0] == '?' and line[-3:] == '.py': print >> sys.stderr, 'WARNING: untracked file', os.path.join(dirname, line_file) # id_stdout is 12 hex digits followed by '+\n' or '\n' # return the trailing '+' character only if there were changes to files that # the caller cares about (named in filenames) modified = (id_stdout[12] == '+') assert len(id_stdout) in (13, 14) #sanity check if modified and care_about_mod : return id_stdout[:13] else: return id_stdout[:12] def _import_id_py_source(location): try: dirname = os.path.dirname(location[1]) basename = os.path.basename(location[1]) return hg_version(dirname, [basename]) except OSError, e: print >> sys.stderr, 'IGNORNING', e return _unknown_version + ' PY_SOURCE' def _import_id_py_compiled(location): #a .pyc file was found, but no corresponding .py return _unknown_version + ' PYC_COMPILED' def _import_id_pkg_directory(location): try: return hg_version(location[1]) except OSError, e: print >> sys.stderr, 'IGNORNING', e return _unknown_version + ' PKG_DIRECTORY' def _import_id(tag): try : location = _imp.find_module(tag) except ImportError, e: #raise when tag is not found return e #put this in the cache, import_id will raise it #the find_module was successful, location is valid resource_type = location[2][2] if resource_type == _imp.PY_SOURCE: return _import_id_py_source(location) if resource_type == _imp.PY_COMPILED: return _import_id_py_compiled(location) if resource_type == _imp.C_EXTENSION: raise NoteImplementedError if resource_type == _imp.PY_RESOURCE: raise NoteImplementedError if resource_type == _imp.PKG_DIRECTORY: return _import_id_pkg_directory(location) if resource_type == _imp.C_BUILTIN: raise NoteImplementedError if resource_type == _imp.PY_FROZEN: raise NoteImplementedError assert False #the list of resource types above should be exhaustive def import_id(tag): """Return an identifier of the code imported by 'import <tag>'. @param tag: a module or file name @type tag: string @rtype: string @return: identifier of the code imported by 'import <tag>'. This high-level function might do different things depending on, for example, whether I{tag} identifies a file or a directory, or whether the named entity is under some sort of version/revision control. Versions are sought in the following order: 0. If I{tag} is 'python' then sys.version will be returned 1. If I{tag} names a file or folder under revision control, this function will attempt to guess which one, and return a string that identifies the running code (a revision id, not the whole file!) 2. If I{tag} names a module with a __version__ attribute, then that attribute will be returned as a string. 3. The string starting with 'unknown version' will be returned for other valid modules. 4. An exception will be raise for non-existent modules. @note: This function may import the named entity in order to return a __version__ module attribute. """ if tag not in import_id.cache: import_id.cache[tag] = _import_id(tag) #in the case of bad module names, we cached the ImportError exception rval = import_id.cache[tag] if isinstance(rval, Exception): raise rval return rval import_id.cache = {'python':sys.version} def get_all_src_versions() : """ Get the version of all loaded module. Calls src_version on all loaded modules. These modules are found using sys.modules. Returns a dictionnary: name->version. @RETURN dict Dictionnary (module's name) -> (version) @SEE src_version """ allmodules = sys.modules d = dict() for m in allmodules : try: d[m] = import_id(m) except: pass return d if __name__ == "__main__" : if len(sys.argv) == 2 : print 'testing on', sys.argv[1] print import_id(sys.argv[1])