view dataset.py @ 36:438440ba0627

Rewriting dataset.py completely
author bengioy@zircon.iro.umontreal.ca
date Tue, 22 Apr 2008 18:03:11 -0400
parents 46c5c90019c2
children 73c4212ba5b3
line wrap: on
line source


from lookup_list import LookupList
Example = LookupList
from misc import *
import copy

class AbstractFunction (Exception): """Derived class must override this function"""
class NotImplementedYet (NotImplementedError): """Work in progress, this should eventually be implemented"""

class DataSet(object):
    """A virtual base class for datasets.

    A DataSet can be seen as a generalization of a matrix, meant to be used in conjunction
    with learning algorithms (for training and testing them): rows/records are called examples, and
    columns/attributes are called fields. The field value for a particular example can be an arbitrary
    python object, which depends on the particular dataset.
    
    We call a DataSet a 'stream' when its length is unbounded (len(dataset)==float("infinity")).

    A DataSet is a generator of iterators; these iterators can run through the
    examples or the fields in a variety of ways.  A DataSet need not necessarily have a finite
    or known length, so this class can be used to interface to a 'stream' which
    feeds on-line learning (however, as noted below, some operations are not
    feasible or not recommanded on streams).

    To iterate over examples, there are several possibilities:
    * for example in dataset([field1, field2,field3, ...]):
    * for val1,val2,val3 in dataset([field1, field2,field3]):
    * for minibatch in dataset.minibatches([field1, field2, ...],minibatch_size=N):
    * for example in dataset:
    Each of these is documented below. All of these iterators are expected
    to provide, in addition to the usual 'next()' method, a 'next_index()' method
    which returns a non-negative integer pointing to the position of the next
    example that will be returned by 'next()' (or of the first example in the
    next minibatch returned). This is important because these iterators
    can wrap around the dataset in order to do multiple passes through it,
    in possibly unregular ways if the minibatch size is not a divisor of the
    dataset length.
    
    To iterate over fields, one can do
    * for fields in dataset.fields()
    * for fields in dataset(field1,field2,...).fields() to select a subset of fields
    * for fields in dataset.fields(field1,field2,...) to select a subset of fields
    and each of these fields is iterable over the examples:
    * for field_examples in dataset.fields():
        for example_value in field_examples:
           ...
    but when the dataset is a stream (unbounded length), it is not recommanded to do 
    such things because the underlying dataset may refuse to access the different fields in
    an unsynchronized ways. Hence the fields() method is illegal for streams, by default.
    The result of fields() is a DataSetFields object, which iterates over fields,
    and whose elements are iterable over examples. A DataSetFields object can
    be turned back into a DataSet with its examples() method:
      dataset2 = dataset1.fields().examples()
    and dataset2 should behave exactly like dataset1 (in fact by default dataset2==dataset1).
    
    Note: Fields are not mutually exclusive, i.e. two fields can overlap in their actual content.

    Note: The content of a field can be of any type. Field values can also be 'missing'
    (e.g. to handle semi-supervised learning), and in the case of numeric (numpy array)
    fields (i.e. an ArrayFieldsDataSet), NaN plays the role of a missing value.

    Dataset elements can be indexed and sub-datasets (with a subset
    of examples) can be extracted. These operations are not supported
    by default in the case of streams.

    * dataset[:n] returns a dataset with the n first examples.

    * dataset[i1:i2:s] returns a dataset with the examples i1,i1+s,...i2-s.

    * dataset[i] returns an Example.

    * dataset[[i1,i2,...in]] returns a dataset with examples i1,i2,...in.

    Datasets can be concatenated either vertically (increasing the length) or
    horizontally (augmenting the set of fields), if they are compatible, using
    the following operations (with the same basic semantics as numpy.hstack
    and numpy.vstack):

    * dataset1 | dataset2 | dataset3 == dataset.hstack([dataset1,dataset2,dataset3])

    creates a new dataset whose list of fields is the concatenation of the list of
    fields of the argument datasets. This only works if they all have the same length.

    * dataset1 + dataset2 + dataset3 == dataset.vstack([dataset1,dataset2,dataset3])

    creates a new dataset that concatenates the examples from the argument datasets
    (and whose length is the sum of the length of the argument datasets). This only
    works if they all have the same fields.

    According to the same logic, and viewing a DataSetFields object associated to
    a DataSet as a kind of transpose of it, fields1 + fields2 concatenates fields of
    a DataSetFields fields1 and fields2, and fields1 | fields2 concatenates their
    examples.


    A DataSet sub-class should always redefine the following methods:
      * __len__ if it is not a stream
      * __getitem__ may not be feasible with some streams
      * fieldNames
      * minibatches
      * valuesHStack
      * valuesVStack
    For efficiency of implementation, a sub-class might also want to redefine
      * hasFields
    """

    infinity = float("infinity")
    
    def __init__(self):
        pass
    
    class MinibatchToSingleExampleIterator(object):
        """
        Converts the result of minibatch iterator with minibatch_size==1 into
        single-example values in the result. Therefore the result of
        iterating on the dataset itself gives a sequence of single examples
        (whereas the result of iterating over minibatches gives in each
        Example field an iterable object over the individual examples in
        the minibatch).
        """
        def __init__(self, minibatch_iterator):
            self.minibatch_iterator = minibatch_iterator
        def __iter__(self): #makes for loop work
            return self
        def next(self):
            return self.minibatch_iterator.next()[0]
        def next_index(self):
            return self.minibatch_iterator.next_index()

    def __iter__(self):
        """Supports the syntax "for i in dataset: ..."

        Using this syntax, "i" will be an Example instance (or equivalent) with
        all the fields of DataSet self.  Every field of "i" will give access to
        a field of a single example.  Fields should be accessible via
        i["fielname"] or i[3] (in the order defined by the elements of the
        Example returned by this iterator), but the derived class is free
        to accept any type of identifier, and add extra functionality to the iterator.

        The default implementation calls the minibatches iterator and extracts the first example of each field.
        """
        return DataSet.MinibatchToSingleExampleIterator(self.minibatches(None, minibatch_size = 1))

    minibatches_fieldnames = None
    minibatches_minibatch_size = 1
    minibatches_n_batches = None
    def minibatches(self,
            fieldnames = minibatches_fieldnames,
            minibatch_size = minibatches_minibatch_size,
            n_batches = minibatches_n_batches):
        """
        Return an iterator that supports three forms of syntax:

            for i in dataset.minibatches(None,**kwargs): ...

            for i in dataset.minibatches([f1, f2, f3],**kwargs): ...

            for i1, i2, i3 in dataset.minibatches([f1, f2, f3],**kwargs): ...

        Using the first two syntaxes, "i" will be an indexable object, such as a list,
        tuple, or Example instance. In both cases, i[k] is a list-like container
        of a batch of current examples. In the second case, i[0] is
        list-like container of the f1 field of a batch current examples, i[1] is
        a list-like container of the f2 field, etc.

        Using the first syntax, all the fields will be returned in "i".
        Beware that some datasets may not support this syntax, if the number
        of fields is infinite (i.e. field values may be computed "on demand").

        Using the third syntax, i1, i2, i3 will be list-like containers of the
        f1, f2, and f3 fields of a batch of examples on each loop iteration.

        The minibatches iterator is expected to return upon each call to next()
        a DataSetFields object, which is a LookupList (indexed by the field names) whose
        elements are iterable over the minibatch examples, and which keeps a pointer to
        a sub-dataset that can be used to iterate over the individual examples
        in the minibatch. Hence a minibatch can be converted back to a regular
        dataset or its fields can be looked at individually (and possibly iterated over).

        PARAMETERS
        - fieldnames (list of any type, default None):
        The loop variables i1, i2, i3 (in the example above) should contain the
        f1, f2, and f3 fields of the current batch of examples.  If None, the
        derived class can choose a default, e.g. all fields.

        - minibatch_size (integer, default 1)
        On every iteration, the variables i1, i2, i3 will have
        exactly minibatch_size elements. e.g. len(i1) == minibatch_size

        - n_batches (integer, default None)
        The iterator will loop exactly this many times, and then stop.  If None,
        the derived class can choose a default.  If (-1), then the returned
        iterator should support looping indefinitely.

        Note: A list-like container is something like a tuple, list, numpy.ndarray or
        any other object that supports integer indexing and slicing.

        """
        raise AbstractFunction()


    def __len__(self):
        """
        len(dataset) returns the number of examples in the dataset.
        By default, a DataSet is a 'stream', i.e. it has an unbounded (infinite) length.
        Sub-classes which implement finite-length datasets should redefine this method.
        Some methods only make sense for finite-length datasets, and will perform
           assert len(dataset)<DataSet.infinity
        in order to check the finiteness of the dataset.
        """
        return infinity

    def hasFields(self,*fieldnames):
        """
        Return true if the given field name (or field names, if multiple arguments are
        given) is recognized by the DataSet (i.e. can be used as a field name in one
        of the iterators).

        The default implementation may be inefficient (O(# fields in dataset)), as it calls the fieldNames()
        method. Many datasets may store their field names in a dictionary, which would allow more efficiency.
        """
        return len(unique_elements_list_intersection(fieldnames,self.fieldNames()))>0
        
    def fieldNames(self):
        """
        Return the list of field names that are supported by the iterators,
        and for which hasFields(fieldname) would return True.
        """
        raise AbstractFunction()

    def __call__(self,*fieldnames):
        """
        Return a dataset that sees only the fields whose name are specified.
        """
        assert self.hasFields(fieldnames)
        return self.fields(fieldnames).examples()

    def fields(self,*fieldnames):
        """
        Return a DataSetFields object associated with this dataset.
        """
        return DataSetFields(self,fieldnames)

    def __getitem__(self,i):
        """
        dataset[i] returns the (i+1)-th example of the dataset.
        dataset[i:j] returns the subdataset with examples i,i+1,...,j-1.
        dataset[i:j:s] returns the subdataset with examples i,i+2,i+4...,j-2.
        dataset[[i1,i2,..,in]] returns the subdataset with examples i1,i2,...,in.

        Note that some stream datasets may be unable to implement slicing/indexing
        because they can only iterate through examples one or a minibatch at a time
        and do not actually store or keep past (or future) examples.
        """
        raise NotImplementedError()

    def valuesHStack(self,fieldnames,fieldvalues):
        """
        Return a value that corresponds to concatenating (horizontally) several field values.
        This can be useful to merge some fields. The implementation of this operation is likely
        to involve a copy of the original values. When the values are numpy arrays, the
        result should be numpy.hstack(values). If it makes sense, this operation should
        work as well when each value corresponds to multiple examples in a minibatch
        e.g. if each value is a Ni-vector and a minibatch of length L is a LxNi matrix,
        then the result should be a Lx(N1+N2+..) matrix equal to numpy.hstack(values).
        The default is to use numpy.hstack for numpy.ndarray values, and a list
        pointing to the original values for other data types.
        """
        all_numpy=True
        for value in fieldvalues:
            if not type(value) is numpy.ndarray:
                all_numpy=False
        if all_numpy:
            return numpy.hstack(fieldvalues)
        # the default implementation of horizontal stacking is to put values in a list
        return fieldvalues


    def valuesVStack(self,fieldname,values):
        """
        Return a value that corresponds to concatenating (vertically) several values of the
        same field. This can be important to build a minibatch out of individual examples. This
        is likely to involve a copy of the original values. When the values are numpy arrays, the
        result should be numpy.vstack(values).
        The default is to use numpy.vstack for numpy.ndarray values, and a list
        pointing to the original values for other data types.
        """
        all_numpy=True
        for value in values:
            if not type(value) is numpy.ndarray:
                all_numpy=False
        if all_numpy:
            return numpy.vstack(values)
        # the default implementation of vertical stacking is to put values in a list
        return values

    def __or__(self,other):
        """
        dataset1 | dataset2 returns a dataset whose list of fields is the concatenation of the list of
        fields of the argument datasets. This only works if they all have the same length.
        """
        return HStackedDataSet(self,other)

    def __add__(self,other):
        """
        dataset1 + dataset2 is a dataset that concatenates the examples from the argument datasets
        (and whose length is the sum of the length of the argument datasets). This only
        works if they all have the same fields.
        """
        return VStackedDataSet(self,other)

def hstack(datasets):
    """
    hstack(dataset1,dataset2,...) returns dataset1 | datataset2 | ...
    which is a dataset whose fields list is the concatenation of the fields
    of the individual datasets.
    """
    assert len(datasets)>0
    if len(datasets)==1:
        return datasets[0]
    return HStackedDataSet(datasets)

def vstack(datasets):
    """
    vstack(dataset1,dataset2,...) returns dataset1 + datataset2 + ...
    which is a dataset which iterates first over the examples of dataset1, then
    over those of dataset2, etc.
    """
    assert len(datasets)>0
    if len(datasets)==1:
        return datasets[0]
    return VStackedDataSet(datasets)


class DataSetFields(LookupList):
    """
    Although a DataSet iterates over examples (like rows of a matrix), an associated
    DataSetFields iterates over fields (like columns of a matrix), and can be understood
    as a transpose of the associated dataset.

    To iterate over fields, one can do
    * for fields in dataset.fields()
    * for fields in dataset(field1,field2,...).fields() to select a subset of fields
    * for fields in dataset.fields(field1,field2,...) to select a subset of fields
    and each of these fields is iterable over the examples:
    * for field_examples in dataset.fields():
        for example_value in field_examples:
           ...
    but when the dataset is a stream (unbounded length), it is not recommanded to do 
    such things because the underlying dataset may refuse to access the different fields in
    an unsynchronized ways. Hence the fields() method is illegal for streams, by default.
    The result of fields() is a DataSetFields object, which iterates over fields,
    and whose elements are iterable over examples. A DataSetFields object can
    be turned back into a DataSet with its examples() method:
      dataset2 = dataset1.fields().examples()
    and dataset2 should behave exactly like dataset1 (in fact by default dataset2==dataset1).
    """
    def __init__(self,dataset,*fieldnames):
        self.dataset=dataset
        assert dataset.hasField(*fieldnames)
        LookupList.__init__(self,dataset.fieldNames(),
                            dataset.minibatches(fieldnames if len(fieldnames)>0 else self.fieldNames(),minibatch_size=len(dataset)).next()
    def examples(self):
        return self.dataset
    
    def __or__(self,other):
        """
        fields1 | fields2 is a DataSetFields that whose list of examples is the concatenation
        of the list of examples of DataSetFields fields1 and fields2.
        """
        return (self.examples() + other.examples()).fields()

    def __add__(self,other):
        """
        fields1 + fields2 is a DataSetFields that whose list of fields is the concatenation
        of the fields of DataSetFields fields1 and fields2.
        """
        return (self.examples() | other.examples()).fields()

class MinibatchDataSet(DataSet):
    """
    Turn a LookupList of same-length fields into an example-iterable dataset.
    Each element of the lookup-list should be an iterable and sliceable, all of the same length.
    """
    def __init__(self,fields_lookuplist,values_vstack=DataSet().valuesVStack,
                 values_hstack=DataSet().valuesHStack):
        """
        The user can (and generally should) also provide values_vstack(fieldname,fieldvalues)
        and a values_hstack(fieldnames,fieldvalues) functions behaving with the same
        semantics as the DataSet methods of the same name (but without the self argument).
        """
        self.fields=fields_lookuplist
        assert len(fields_lookuplist)>0
        self.length=len(fields_lookuplist[0])
        for field in fields_lookuplist[1:]:
            assert self.length==len(field)
        self.values_vstack=values_vstack
        self.values_hstack=values_hstack

    def __len__(self):
        return self.length

    def __getitem__(self,i):
        return Example(self.fields.keys(),[field[i] for field in self.fields])

    def fieldNames(self):
        return self.fields.keys()

    def hasField(self,*fieldnames):
        for fieldname in fieldnames:
            if fieldname not in self.fields:
                return False
        return True

    def minibatches(self,
                    fieldnames = minibatches_fieldnames,
                    minibatch_size = minibatches_minibatch_size,
                    n_batches = minibatches_n_batches):
        class Iterator(object):
            def __init__(self,ds):
                self.ds=ds
                self.next_example=0
                self.n_batches_done=0
                assert minibatch_size > 0
                if minibatch_size > ds.length
                    raise NotImplementedError()
            def __iter__(self):
                return self
            def next_index(self):
                return self.next_example
            def next(self):
                upper = next_example+minibatch_size
                if upper<=self.ds.length:
                    minibatch = Example(self.ds.fields.keys(),
                                        [field[next_example:upper]
                                         for field in self.ds.fields])
                else: # we must concatenate (vstack) the bottom and top parts of our minibatch
                    minibatch = Example(self.ds.fields.keys(),
                                        [self.ds.valuesVStack(name,[value[next_example:],
                                                                     value[0:upper-self.ds.length]])
                                         for name,value in self.ds.fields.items()])
                self.next_example+=minibatch_size
                self.n_batches_done+=1
                if n_batches:
                    if self.n_batches_done==n_batches:
                        raise StopIteration
                    if self.next_example>=self.ds.length:
                        self.next_example-=self.ds.length
                else:
                    if self.next_example>=self.ds.length:
                        raise StopIteration
                return DataSetFields(MinibatchDataSet(minibatch),fieldnames)

        return Iterator(self)

    def valuesVStack(self,fieldname,fieldvalues):
        return self.values_vstack(fieldname,fieldvalues)
    
    def valuesHStack(self,fieldnames,fieldvalues):
        return self.values_hstack(fieldnames,fieldvalues)
    
class HStackedDataSet(DataSet):
    """
    A DataSet that wraps several datasets and shows a view that includes all their fields,
    i.e. whose list of fields is the concatenation of their lists of fields.

    If a field name is found in more than one of the datasets, then either an error is
    raised or the fields are renamed (either by prefixing the __name__ attribute 
    of the dataset + ".", if it exists, or by suffixing the dataset index in the argument list).

    TODO: automatically detect a chain of stacked datasets due to A | B | C | D ...
    """
    def __init__(self,datasets,accept_nonunique_names=False):
        DataSet.__init__(self)
        self.datasets=datasets
        self.accept_nonunique_names=accept_nonunique_names
        self.fieldname2dataset={}

        def rename_field(fieldname,dataset,i):
            if hasattr(dataset,"__name__"):
                return dataset.__name__ + "." + fieldname
            return fieldname+"."+str(i)
            
        # make sure all datasets have the same length and unique field names
        self.length=None
        names_to_change=[]
        for i in xrange(len(datasets)):
            dataset = datasets[i]
            length=len(dataset)
            if self.length:
                assert self.length==length
            else:
                self.length=length
            for fieldname in dataset.fieldNames():
                if fieldname in self.fieldname2dataset: # name conflict!
                    if accept_nonunique_names:
                        fieldname=rename_field(fieldname,dataset,i)
                        names2change.append((fieldname,i))
                    else:
                        raise ValueError("Incompatible datasets: non-unique field name = "+fieldname)
                self.fieldname2dataset[fieldname]=i
        for fieldname,i in names_to_change:
            del self.fieldname2dataset[fieldname]
            self.fieldname2dataset[rename_field(fieldname,self.datasets[i],i)]=i
            
    def hasField(self,*fieldnames):
        for fieldname in fieldnames:
            if not fieldname in self.fieldname2dataset:
                return False
        return True

    def fieldNames(self):
        return self.fieldname2dataset.keys()
            
    def minibatches(self,
            fieldnames = minibatches_fieldnames,
            minibatch_size = minibatches_minibatch_size,
            n_batches = minibatches_n_batches):

        class Iterator(object):
            def __init__(self,hsds,iterators):
                self.hsds=hsds
                self.iterators=iterators
            def __iter__(self):
                return self
            def next_index(self):
                return self.iterators[0].next_index()
            def next(self):
                # concatenate all the fields of the minibatches
                minibatch = reduce(LookupList.__add__,[iterator.next() for iterator in self.iterators])
                # and return a DataSetFields whose dataset is the transpose (=examples()) of this minibatch
                return DataSetFields(MinibatchDataSet(minibatch,self.hsds.valuesVStack,
                                                      self.hsds.valuesHStack),
                                     fieldnames if fieldnames else hsds.fieldNames())
                                     
        assert self.hasfields(fieldnames)
        # find out which underlying datasets are necessary to service the required fields
        # and construct corresponding minibatch iterators
        if fieldnames:
            datasets=set([])
            fields_in_dataset=dict([(dataset,[]) for dataset in datasets])
            for fieldname in fieldnames:
                dataset=self.datasets[self.fieldnames2dataset[fieldname]]
                datasets.add(dataset)
                fields_in_dataset[dataset].append(fieldname)
            datasets=list(datasets)
            iterators=[dataset.minibatches(fields_in_dataset[dataset],minibatch_size,n_batches)
                       for dataset in datasets]
        else:
            datasets=self.datasets
            iterators=[dataset.minibatches(None,minibatch_size,n_batches) for dataset in datasets]
        return Iterator(self,iterators)


    def valuesVStack(self,fieldname,fieldvalues):
        return self.datasets[self.fieldname2dataset[fieldname]].valuesVStack(fieldname,fieldvalues)
    
    def valuesHStack(self,fieldnames,fieldvalues):
        """
        We will use the sub-dataset associated with the first fieldname in the fieldnames list
        to do the work, hoping that it can cope with the other values (i.e. won't care
        about the incompatible fieldnames). Hence this heuristic will always work if
        all the fieldnames are of the same sub-dataset.
        """
        return self.datasets[self.fieldname2dataset[fieldnames[0]]].valuesHStack(fieldnames,fieldvalues)

class VStackedDataSet(DataSet):
    """
    A DataSet that wraps several datasets and shows a view that includes all their examples,
    in the order provided. This clearly assumes that they all have the same field names
    and all (except possibly the last one) are of finite length.

    TODO: automatically detect a chain of stacked datasets due to A + B + C + D ...
    """
    def __init__(self,datasets):
        self.datasets=datasets
        self.length=0
        self.index2dataset={}
        # we use this map from row index to dataset index for constant-time random access of examples,
        # to avoid having to search for the appropriate dataset each time and slice is asked for
        for dataset,k in enumerate(datasets[0:-1]):
            L=len(dataset)
            assert L<DataSet.infinity
            for i in xrange(L):
                self.index2dataset[self.length+i]=k
            self.length+=L
        self.last_start=self.length
        self.length+=len(datasets[-1])
        
            
def supervised_learning_dataset(src_dataset,input_fields,target_fields,weight_field=None):
    """
    Wraps an arbitrary DataSet into one for supervised learning tasks by forcing the
    user to define a set of fields as the 'input' field and a set of fields
    as the 'target' field. Optionally, a single weight_field can also be defined.
    """
    args = ((input_fields,'input'),(output_fields,'target'))
    if weight_field: args+=(([weight_field],'weight'))
    return src_dataset.merge_fields(*args)