view pylearn/shared/layers/tests/test_kouh2008.py @ 1447:fbe470217937

Use .get_value() and .set_value() of shared instead of the .value property
author Pascal Lamblin <lamblinp@iro.umontreal.ca>
date Wed, 16 Mar 2011 20:20:02 -0400
parents c635d1df51a1
children
line wrap: on
line source

import numpy
import theano.compile.debugmode
from theano.compile.debugmode import DebugMode
from theano import tensor
from theano.compile import pfunc
from pylearn.shared.layers import LogisticRegression, Kouh2008

def test_dtype():
    n_in = 10
    n_out = 10
    n_terms = 3
    rng = numpy.random.RandomState(23455)
    layer = Kouh2008.new_filters_expbounds(rng, tensor.dmatrix(), n_in, n_out, n_terms, dtype='float64')
    assert layer.output.dtype =='float64'
    layer = Kouh2008.new_filters_expbounds(rng, tensor.fmatrix(), n_in, n_out, n_terms, dtype='float32')
    assert layer.output.dtype =='float32'

def run_w_random(bsize=10, n_iter=200, n_in = 1024, n_out = 100, n_terms=2, dtype='float64'):
    if isinstance(theano.compile.mode.get_default_mode(),DebugMode):
        n_iter=2
        
    x = tensor.dmatrix()
    y = tensor.lvector()
    rng = numpy.random.RandomState(23455)

    layer = Kouh2008.new_filters_expbounds(rng, x, n_in, n_out, n_terms, dtype='float64')
    out = LogisticRegression.new(layer.output, n_out, 2)
    cost = out.nll(y).sum()

    #isolated optimization
    for ii in xrange(len(layer.params)):
        params = out.params+ [layer.params[ii]]
        print 'PARAMS', params
        updates = [(p, p - numpy.asarray(0.001, dtype=dtype)*gp) for p,gp in zip(params, tensor.grad(cost, params)) ]
        print 'COMPILING'
        f = pfunc([x, y], cost, updates=updates)
        print 'DONE'
        if False:
            for i, n in enumerate(f.maker.env.toposort()):
                print i, n

        xval = numpy.asarray(rng.rand(bsize, n_in), dtype=dtype)
        yval = numpy.asarray(rng.randint(0,2,bsize), dtype='int64')
        f0 = f(xval, yval)
        for i in xrange(n_iter):
            fN = f(xval, yval)
            assert fN  < f0
            f0 = fN
            #if 0 ==  i % 5: print i, 'rval', fN

    return fN

def test_A(bsize=10, n_iter=2, n_in = 10, n_out = 10, n_terms=2, dtype='float64'):

    x = tensor.dmatrix()
    y = tensor.lvector()
    rng = numpy.random.RandomState(23455)

    layer = Kouh2008.new_filters_expbounds(rng, x, n_in, n_out, n_terms, dtype='float64')
    out = LogisticRegression.new(layer.output, n_out, 2)
    cost = out.nll(y).sum()
    #joint optimization except for one of the linear filters
    out.w.set_value((out.w.get_value(borrow=True) +
                     0.1 * rng.rand(*out.w.get_value(borrow=True).shape)),
            borrow=True)
    params = layer.params[:-2]
    mode = None
    updates = [(p, p - numpy.asarray(0.001, dtype=dtype)*gp) for p,gp in zip(params, tensor.grad(cost, params)) ]
    for p, newp in updates:
        if p is layer.r:
            theano.compile.debugmode.debugprint(newp, depth=5)
    f = pfunc([x, y], [cost], mode, updates=updates)
    env_r = f.maker.env.inputs[9]
    order = f.maker.env.toposort()

    assert str(f.maker.env.outputs[6].owner.inputs[0]) == 'r'
    assert str(f.maker.env.inputs[9]) == 'r'
    assert f.maker.env.outputs[6].owner.inputs[0] is env_r
    assert (f.maker.env.outputs[6].owner,0) in env_r.clients

    if False:
        for i, n in enumerate(f.maker.env.toposort()):
            print i, n, n.inputs

    xval = numpy.asarray(rng.rand(bsize, n_in), dtype=dtype)
    yval = numpy.asarray(rng.randint(0,2,bsize), dtype='int64')
    for i in xrange(n_iter):
        fN = f(xval, yval)
        if 0 == i:
            f0 = fN
        #if 0 ==  i % 5: print i, 'rval', fN
        print i, 'rval', fN
        for p0 in params:
            for p1 in params:
                assert p0 is p1 or not numpy.may_share_memory(p0.value, p1.value)
        assert not numpy.may_share_memory(layer.r.value, xval)
    print 'XVAL SUM', xval.sum(), layer.r.value.sum()

    assert f0 > 6
    assert fN < f0 # TODO: assert more improvement

if __name__ == '__main__':
    test_A()

def test_smaller():
    rval = run_w_random(n_in=10, n_out=8)
    if not isinstance(theano.compile.mode.get_default_mode(),DebugMode):
        assert rval < 6.1

def test_smaller32():
    rval = run_w_random(n_in=10, n_out=8, dtype='float32')
    if not isinstance(theano.compile.mode.get_default_mode(),DebugMode):
        assert rval < 6.1

def test_big():
    rval = run_w_random()
    if not isinstance(theano.compile.mode.get_default_mode(),DebugMode):
        assert rval < 0.1