view _test_dataset.py @ 453:ce6b4fd3ab29

Fixed typo in help
author delallea@valhalla.apstat.com
date Thu, 04 Sep 2008 13:48:47 -0400
parents 18702ceb2096
children 82da179d95b2
line wrap: on
line source

#!/bin/env python
from dataset import *
from math import *
import numpy, unittest, sys
#from misc import *
from lookup_list import LookupList

def have_raised(to_eval, **var):
    have_thrown = False
    try:
        eval(to_eval)
    except :
        have_thrown = True
    return have_thrown

def have_raised2(f, *args, **kwargs):
    have_thrown = False
    try:
        f(*args, **kwargs)
    except :
        have_thrown = True
    return have_thrown

def test1():
    print "test1"
    global a,ds
    a = numpy.random.rand(10,4)
    print a
    ds = ArrayDataSet(a,{'x':slice(3),'y':3,'z':[0,2]})
    print "len(ds)=",len(ds)
    assert(len(ds)==10)
    print "example 0 = ",ds[0]
#    assert
    print "x=",ds["x"]
    print "x|y"
    for x,y in ds("x","y"):
        print x,y
    minibatch_iterator = ds.minibatches(fieldnames=['z','y'],n_batches=1,minibatch_size=3,offset=4)
    minibatch = minibatch_iterator.__iter__().next()
    print "minibatch=",minibatch
    for var in minibatch:
        print "var=",var
    print "take a slice and look at field y",ds[1:6:2]["y"]

    del a,ds,x,y,minibatch_iterator,minibatch,var

def test_iterate_over_examples(array,ds):
#not in doc!!!
    i=0
    for example in range(len(ds)):
        wanted = array[example][:3]
        returned = ds[example]['x']
        if (wanted != returned).all():
            print 'returned:', returned
            print 'wanted:', wanted
        assert (ds[example]['x']==array[example][:3]).all()
        assert ds[example]['y']==array[example][3]
        assert (ds[example]['z']==array[example][[0,2]]).all()
        i+=1
    assert i==len(ds)
    del example,i

#     - for example in dataset:
    i=0
    for example in ds:
        assert len(example)==3
        assert (example['x']==array[i][:3]).all()
        assert example['y']==array[i][3]
        assert (example['z']==array[i][0:3:2]).all()
        assert (numpy.append(example['x'],example['y'])==array[i]).all()
        i+=1
    assert i==len(ds)
    del example,i

#     - for val1,val2,... in dataset:
    i=0
    for x,y,z in ds:
        assert (x==array[i][:3]).all()
        assert y==array[i][3]
        assert (z==array[i][0:3:2]).all()
        assert (numpy.append(x,y)==array[i]).all()
        i+=1
    assert i==len(ds)
    del x,y,z,i

#     - for example in dataset(field1, field2,field3, ...):
    i=0
    for example in ds('x','y','z'):
        assert len(example)==3
        assert (example['x']==array[i][:3]).all()
        assert example['y']==array[i][3]
        assert (example['z']==array[i][0:3:2]).all()
        assert (numpy.append(example['x'],example['y'])==array[i]).all()
        i+=1
    assert i==len(ds)
    del example,i
    i=0
    for example in ds('y','x'):
        assert len(example)==2
        assert (example['x']==array[i][:3]).all()
        assert example['y']==array[i][3]
        assert (numpy.append(example['x'],example['y'])==array[i]).all()
        i+=1
    assert i==len(ds)
    del example,i

#     - for val1,val2,val3 in dataset(field1, field2,field3):
    i=0
    for x,y,z in ds('x','y','z'):
        assert (x==array[i][:3]).all()
        assert y==array[i][3]
        assert (z==array[i][0:3:2]).all()
        assert (numpy.append(x,y)==array[i]).all()
        i+=1
    assert i==len(ds)
    del x,y,z,i
    i=0
    for y,x in ds('y','x',):
        assert (x==array[i][:3]).all()
        assert y==array[i][3]
        assert (numpy.append(x,y)==array[i]).all()
        i+=1
    assert i==len(ds)
    del x,y,i

    def test_minibatch_size(minibatch,minibatch_size,len_ds,nb_field,nb_iter_finished):
        ##full minibatch or the last minibatch
        for idx in range(nb_field):
            test_minibatch_field_size(minibatch[idx],minibatch_size,len_ds,nb_iter_finished)
        del idx
    def test_minibatch_field_size(minibatch_field,minibatch_size,len_ds,nb_iter_finished):
        assert len(minibatch_field)==minibatch_size or ((nb_iter_finished*minibatch_size+len(minibatch_field))==len_ds and len(minibatch_field)<minibatch_size)

#     - for minibatch in dataset.minibatches([field1, field2, ...],minibatch_size=N):
    i=0
    mi=0
    size=3
    m=ds.minibatches(['x','z'], minibatch_size=size)
    assert hasattr(m,'__iter__')
    for minibatch in m:
        assert isinstance(minibatch,LookupList)
        assert len(minibatch)==2
        test_minibatch_size(minibatch,size,len(ds),2,mi)
        if type(ds)==ArrayDataSet:
            assert (minibatch[0][:,::2]==minibatch[1]).all()
        else:
            for j in xrange(len(minibatch[0])):
                (minibatch[0][j][::2]==minibatch[1][j]).all()
        mi+=1
        i+=len(minibatch[0])
    assert i==(len(ds)/size)*size
    assert mi==(len(ds)/size)
    del minibatch,i,m,mi,size

    i=0
    mi=0
    size=3
    m=ds.minibatches(['x','y'], minibatch_size=size)
    assert hasattr(m,'__iter__')
    for minibatch in m:
        assert isinstance(minibatch,LookupList)
        assert len(minibatch)==2
        test_minibatch_size(minibatch,size,len(ds),2,mi)
        mi+=1
        for id in range(len(minibatch[0])):
            assert (numpy.append(minibatch[0][id],minibatch[1][id])==array[i]).all()
            i+=1
    assert i==(len(ds)/size)*size
    assert mi==(len(ds)/size)
    del minibatch,i,id,m,mi,size

#     - for mini1,mini2,mini3 in dataset.minibatches([field1, field2, field3], minibatch_size=N):
    i=0
    mi=0
    size=3
    m=ds.minibatches(['x','z'], minibatch_size=size)
    assert hasattr(m,'__iter__')
    for x,z in m:
        test_minibatch_field_size(x,size,len(ds),mi)
        test_minibatch_field_size(z,size,len(ds),mi)
        for id in range(len(x)):
            assert (x[id][::2]==z[id]).all()
            i+=1
        mi+=1
    assert i==(len(ds)/size)*size
    assert mi==(len(ds)/size)
    del x,z,i,m,mi,size

    i=0
    mi=0
    size=3
    m=ds.minibatches(['x','y'], minibatch_size=3)
    assert hasattr(m,'__iter__')
    for x,y in m:
        assert len(x)==size
        assert len(y)==size
        test_minibatch_field_size(x,size,len(ds),mi)
        test_minibatch_field_size(y,size,len(ds),mi)
        mi+=1
        for id in range(len(x)):
            assert (numpy.append(x[id],y[id])==array[i]).all()
            i+=1
    assert i==(len(ds)/size)*size
    assert mi==(len(ds)/size)
    del x,y,i,id,m,mi,size

#not in doc
    i=0
    size=3
    m=ds.minibatches(['x','y'],n_batches=1,minibatch_size=size,offset=4)
    assert hasattr(m,'__iter__')
    for x,y in m:
        assert len(x)==size
        assert len(y)==size
        for id in range(size):
            assert (numpy.append(x[id],y[id])==array[i+4]).all()
            i+=1
    assert i==size
    del x,y,i,id,m,size

    i=0
    size=3
    m=ds.minibatches(['x','y'],n_batches=2,minibatch_size=size,offset=4)
    assert hasattr(m,'__iter__')
    for x,y in m:
        assert len(x)==size
        assert len(y)==size
        for id in range(size):
            assert (numpy.append(x[id],y[id])==array[i+4]).all()
            i+=1
    assert i==2*size
    del x,y,i,id,m,size

    i=0
    size=3
    m=ds.minibatches(['x','y'],n_batches=20,minibatch_size=size,offset=4)
    assert hasattr(m,'__iter__')
    for x,y in m:
        assert len(x)==size
        assert len(y)==size
        for id in range(size):
            assert (numpy.append(x[id],y[id])==array[(i+4)%array.shape[0]]).all()
            i+=1
    assert i==2*size # should not wrap
    del x,y,i,id,size

    assert have_raised2(ds.minibatches,['x','y'],n_batches=1,minibatch_size=len(array)+1,offset=0)
    assert not have_raised2(ds.minibatches,['x','y'],n_batches=1,minibatch_size=len(array),offset=0)

def test_ds_iterator(array,iterator1,iterator2,iterator3):
    l=len(iterator1)
    i=0
    for x,y in iterator1:
        assert (x==array[i][:3]).all()
        assert y==array[i][3]
        assert (numpy.append(x,y)==array[i]).all()
        i+=1
    assert i==l
    i=0
    for y,z in iterator2:
        assert y==array[i][3]
        assert (z==array[i][0:3:2]).all()
        i+=1
    assert i==l
    i=0
    for x,y,z in iterator3:
        assert (x==array[i][:3]).all()
        assert y==array[i][3]
        assert (z==array[i][0:3:2]).all()
        assert (numpy.append(x,y)==array[i]).all()
        i+=1
    assert i==l
    
def test_getitem(array,ds):
    def test_ds(orig,ds,index):
        i=0
        assert isinstance(ds,LookupList)
        assert len(ds)==3
        assert len(ds[0])==len(index)
#        for x,z,y in ds('x','z','y'):
        for idx in index:
            assert (orig[idx]['x']==array[idx][:3]).all()
            assert (orig[idx]['x']==ds['x'][i]).all()
            assert orig[idx]['y']==array[idx][3]
            assert (orig[idx]['y']==ds['y'][i]).all() # why does it crash sometimes?
            assert (orig[idx]['z']==array[idx][0:3:2]).all()
            assert (orig[idx]['z']==ds['z'][i]).all()
            i+=1
        del i
        ds[0]
        if len(ds)>2:
            ds[:1]
            ds[1:1]
            ds[1:1:1]
        if len(ds)>5:
            ds[[1,2,3]]
        for x in ds:
            pass

#ds[:n] returns a LookupList with the n first examples.
    ds2=ds[:3]
    test_ds(ds,ds2,index=[0,1,2])
    del ds2

#ds[i:j] returns a LookupList with examples i,i+1,...,j-1.
    ds2=ds[1:3]
    test_ds(ds,ds2,index=[1,2])
    del ds2

#ds[i1:i2:s] returns a LookupList with the examples i1,i1+s,...i2-s.
    ds2=ds[1:7:2]
    test_ds(ds,ds2,[1,3,5])
    del ds2

#ds[i] returns the (i+1)-th example of the dataset.
    ds2=ds[5]
    assert isinstance(ds2,Example)
    assert have_raised("var['ds']["+str(len(ds))+"]",ds=ds)  # index not defined
    assert not have_raised("var['ds']["+str(len(ds)-1)+"]",ds=ds)
    del ds2

#ds[[i1,i2,...in]]# returns a ds with examples i1,i2,...in.
    ds2=ds[[4,7,2,8]]
#    assert isinstance(ds2,DataSet)
    test_ds(ds,ds2,[4,7,2,8])
    del ds2

    #ds.<property># returns the value of a property associated with
      #the name <property>. The following properties should be supported:
      #    - 'description': a textual description or name for the ds
      #    - 'fieldtypes': a list of types (one per field)

    #* ds1 | ds2 | ds3 == ds.hstack([ds1,ds2,ds3])#????
        #assert hstack([ds('x','y'),ds('z')])==ds
        #hstack([ds('z','y'),ds('x')])==ds
    assert have_raised2(hstack,[ds('x'),ds('x')])
    assert have_raised2(hstack,[ds('y','x'),ds('x')])
    assert not have_raised2(hstack,[ds('x'),ds('y')])
        
    #        i=0
    #        for example in hstack([ds('x'),ds('y'),ds('z')]):
    #            example==ds[i]
    #            i+=1 
    #        del i,example
    #* ds1 & ds2 & ds3 == ds.vstack([ds1,ds2,ds3])#????

def test_subset(array,ds):
    def test_ds(orig,ds,index):
        i=0
        assert isinstance(ds2,DataSet)
        assert len(ds)==len(index)
        for x,z,y in ds('x','z','y'):
            assert (orig[index[i]]['x']==array[index[i]][:3]).all()
            assert (orig[index[i]]['x']==x).all()
            assert orig[index[i]]['y']==array[index[i]][3]
            assert orig[index[i]]['y']==y
            assert (orig[index[i]]['z']==array[index[i]][0:3:2]).all()
            assert (orig[index[i]]['z']==z).all()
            i+=1
        del i
        ds[0]
        if len(ds)>2:
            ds[:1]
            ds[1:1]
            ds[1:1:1]
        if len(ds)>5:
            ds[[1,2,3]]
        for x in ds:
            pass

#ds[:n] returns a dataset with the n first examples.
    ds2=ds.subset[:3]
    test_ds(ds,ds2,index=[0,1,2])
#    del ds2

#ds[i1:i2:s]# returns a ds with the examples i1,i1+s,...i2-s.
    ds2=ds.subset[1:7:2]
    test_ds(ds,ds2,[1,3,5])
#     del ds2

# #ds[i]
#     ds2=ds.subset[5]
#     assert isinstance(ds2,Example)
#     assert have_raised("var['ds']["+str(len(ds))+"]",ds=ds)  # index not defined
#     assert not have_raised("var['ds']["+str(len(ds)-1)+"]",ds=ds)
#     del ds2

#ds[[i1,i2,...in]]# returns a ds with examples i1,i2,...in.
    ds2=ds.subset[[4,7,2,8]]
    test_ds(ds,ds2,[4,7,2,8])
#     del ds2

#ds.<property># returns the value of a property associated with
  #the name <property>. The following properties should be supported:
  #    - 'description': a textual description or name for the ds
  #    - 'fieldtypes': a list of types (one per field)

#* ds1 | ds2 | ds3 == ds.hstack([ds1,ds2,ds3])#????
    #assert hstack([ds('x','y'),ds('z')])==ds
    #hstack([ds('z','y'),ds('x')])==ds
    assert have_raised2(hstack,[ds('x'),ds('x')])
    assert have_raised2(hstack,[ds('y','x'),ds('x')])
    assert not have_raised2(hstack,[ds('x'),ds('y')])
    
#        i=0
#        for example in hstack([ds('x'),ds('y'),ds('z')]):
#            example==ds[i]
#            i+=1 
#        del i,example
#* ds1 & ds2 & ds3 == ds.vstack([ds1,ds2,ds3])#????

def test_fields_fct(ds):
    #@todo, fill correctly
    assert len(ds.fields())==3
    i=0
    v=0
    for field in ds.fields():
        for field_value in field: # iterate over the values associated to that field for all the ds examples
            v+=1
        i+=1
    assert i==3
    assert v==3*10
    del i,v
    
    i=0
    v=0
    for field in ds('x','z').fields():
        i+=1
        for val in field:
            v+=1
    assert i==2
    assert v==2*10
    del i,v
    
    i=0
    v=0
    for field in ds.fields('x','y'):
        i+=1
        for val in field:
            v+=1
    assert i==2
    assert v==2*10
    del i,v
    
    i=0
    v=0
    for field_examples in ds.fields():
        for example_value in field_examples:
            v+=1
        i+=1
    assert i==3
    assert v==3*10
    del i,v
    
    assert ds == ds.fields().examples()
    assert len(ds('x','y').fields()) == 2
    assert len(ds('x','z').fields()) == 2
    assert len(ds('y').fields()) == 1

    del field

def test_overrides(ds) :
    """ Test for examples that an override __getitem__ acts as the one in DataSet """
    def ndarray_list_equal(nda,l) :
        """ 
        Compares if a ndarray is the same as the list. Do it by converting the list into
        an numpy.ndarray, if possible
        """
        try :
            l = numpy.asmatrix(l)
        except :
            return False
        return smart_equal(nda,l)
        
    def smart_equal(a1,a2) :
        """
        Handles numpy.ndarray, LookupList, and basic containers
        """
        if not isinstance(a1,type(a2)) and not isinstance(a2,type(a1)):
            #special case: matrix vs list of arrays
            if isinstance(a1,numpy.ndarray) :
                return ndarray_list_equal(a1,a2)
            elif isinstance(a2,numpy.ndarray) :
                return ndarray_list_equal(a2,a1)
            return False
        # compares 2 numpy.ndarray
        if isinstance(a1,numpy.ndarray):
            if len(a1.shape) != len(a2.shape):
                return False
            for k in range(len(a1.shape)) :
                if a1.shape[k] != a2.shape[k]:
                    return False
            return (a1==a2).all()
        # compares 2 lookuplists
        if isinstance(a1,LookupList) :
            if len(a1._names) != len(a2._names) :
                return False
            for k in a1._names :
                if k not in a2._names :
                    return False
                if not smart_equal(a1[k],a2[k]) :
                    return False
            return True
        # compares 2 basic containers
        if hasattr(a1,'__len__'):
            if len(a1) != len(a2) :
                return False
            for k in range(len(a1)) :
                if not smart_equal(a1[k],a2[k]):
                    return False
            return True
        # try basic equals
        return a1 is a2

    def mask(ds) :
        class TestOverride(type(ds)):
            def __init__(self,ds) :
                self.ds = ds
            def __getitem__(self,key) :
                res1 = self.ds[key]
                res2 = DataSet.__getitem__(ds,key)
                assert smart_equal(res1,res2)
                return res1
        return TestOverride(ds)
    # test getitem
    ds2 = mask(ds)
    for k in range(10):
        res = ds2[k]
    res = ds2[1:len(ds):3]
    
        

    


def test_all(array,ds):
    assert len(ds)==10
    test_iterate_over_examples(array, ds)
    test_overrides(ds)
    test_getitem(array, ds)
    test_subset(array, ds)
    test_ds_iterator(array,ds('x','y'),ds('y','z'),ds('x','y','z'))
    test_fields_fct(ds)


class T_DataSet(unittest.TestCase):
    def test_ArrayDataSet(self):
        #don't test stream
        #tested only with float value
        #don't always test with y
        #don't test missing value
        #don't test with tuple
        #don't test proterties
        a2 = numpy.random.rand(10,4)
        ds = ArrayDataSet(a2,{'x':slice(3),'y':3,'z':[0,2]})###???tuple not tested
        ds = ArrayDataSet(a2,Example(['x','y','z'],[slice(3),3,[0,2]]))###???tuple not tested
        #assert ds==a? should this work?

        test_all(a2,ds)

        del a2, ds

    def test_CachedDataSet(self):
        a = numpy.random.rand(10,4)
        ds1 = ArrayDataSet(a,Example(['x','y','z'],[slice(3),3,[0,2]]))###???tuple not tested
        ds2 = CachedDataSet(ds1)
        ds3 = CachedDataSet(ds1,cache_all_upon_construction=True)

        test_all(a,ds2)
        test_all(a,ds3)

        del a,ds1,ds2,ds3


    def test_DataSetFields(self):
        raise NotImplementedError()

    def test_ApplyFunctionDataSet(self):
        a = numpy.random.rand(10,4)
        a2 = a+1
        ds1 = ArrayDataSet(a,Example(['x','y','z'],[slice(3),3,[0,2]]))###???tuple not tested

        ds2 = ApplyFunctionDataSet(ds1,lambda x,y,z: (x+1,y+1,z+1), ['x','y','z'],minibatch_mode=False)
        ds3 = ApplyFunctionDataSet(ds1,lambda x,y,z: (numpy.array(x)+1,numpy.array(y)+1,numpy.array(z)+1),
                                   ['x','y','z'],
                                   minibatch_mode=True)

        test_all(a2,ds2)
        test_all(a2,ds3)

        del a,ds1,ds2,ds3

    def test_FieldsSubsetDataSet(self):
        a = numpy.random.rand(10,4)
        ds = ArrayDataSet(a,Example(['x','y','z','w'],[slice(3),3,[0,2],0]))
        ds = FieldsSubsetDataSet(ds,['x','y','z'])

        test_all(a,ds)

        del a, ds

    def test_RenamedFieldsDataSet(self):
        a = numpy.random.rand(10,4)
        ds = ArrayDataSet(a,Example(['x1','y1','z1','w1'],[slice(3),3,[0,2],0]))
        ds = RenamedFieldsDataSet(ds,['x1','y1','z1'],['x','y','z'])

        test_all(a,ds)

        del a, ds

    def test_MinibatchDataSet(self):
        raise NotImplementedError()
    def test_HStackedDataSet(self):
        raise NotImplementedError()
    def test_VStackedDataSet(self):
        raise NotImplementedError()
    def test_ArrayFieldsDataSet(self):
        raise NotImplementedError()


class T_Exotic1(unittest.TestCase):
    class DataSet(DataSet):
            """ Dummy dataset, where one field is a ndarray of variables size. """
            def __len__(self) :
                return 100
            def fieldNames(self) :
                return 'input','target','name'
            def minibatches_nowrap(self,fieldnames,minibatch_size,n_batches,offset):
                class MultiLengthDataSetIterator(object):
                    def __init__(self,dataset,fieldnames,minibatch_size,n_batches,offset):
                        if fieldnames is None: fieldnames = dataset.fieldNames()
                        self.minibatch = Example(fieldnames,range(len(fieldnames)))
                        self.dataset, self.minibatch_size, self.current = dataset, minibatch_size, offset
                    def __iter__(self):
                            return self
                    def next(self):
                        for k in self.minibatch._names :
                            self.minibatch[k] = []
                        for ex in range(self.minibatch_size) :
                            if 'input' in self.minibatch._names:
                                self.minibatch['input'].append( numpy.array( range(self.current + 1) ) )
                            if 'target' in self.minibatch._names:
                                self.minibatch['target'].append( self.current % 2 )
                            if 'name' in self.minibatch._names:
                                self.minibatch['name'].append( str(self.current) )
                            self.current += 1
                        return self.minibatch
                return MultiLengthDataSetIterator(self,fieldnames,minibatch_size,n_batches,offset)
    
    def test_ApplyFunctionDataSet(self):
        ds = T_Exotic1.DataSet()
        dsa = ApplyFunctionDataSet(ds,lambda x,y,z: (x[-1],y*10,int(z)),['input','target','name'],minibatch_mode=False) #broken!!!!!!
        for k in range(len(dsa)):
            res = dsa[k]
            self.failUnless(ds[k]('input')[0][-1] == res('input')[0] , 'problem in first applied function')
        res = dsa[33:96:3]
          
    def test_CachedDataSet(self):
        ds = T_Exotic1.DataSet()
        dsc = CachedDataSet(ds)
        for k in range(len(dsc)) :
            self.failUnless(numpy.all( dsc[k]('input')[0] == ds[k]('input')[0] ) , (dsc[k],ds[k]) )
        res = dsc[:]

if __name__=='__main__':
    tests = []
    debug=False
    if len(sys.argv)==1:
        unittest.main()
    else:
        assert sys.argv[1]=="--debug"
        for arg in sys.argv[2:]:
            tests.append(arg)
        if tests:
            unittest.TestSuite(map(T_DataSet, tests)).debug()
        else:
            module = __import__("_test_dataset")
            tests = unittest.TestLoader().loadTestsFromModule(module)
            tests.debug()