view pylearn/io/audio.py @ 1274:9d5905d6d879

hmc - changed updates to member fn from lambda for pickling
author James Bergstra <bergstrj@iro.umontreal.ca>
date Wed, 08 Sep 2010 13:17:45 -0400
parents 3901d06e2d96
children
line wrap: on
line source

import subprocess, sys
import numpy
import theano

from wavread import WavRead, wav_read_int16, wav_read_double
import mad

def gen_mp3(madfile, dtype, scale):
    printed = False

    while True:
        b = madfile.read()
        if b is None:
            break
        b = numpy.frombuffer(b, dtype='int16')
        #print len(b), b.min(), b.max()
        if not printed:
            bb = b.reshape((len(b)/2,2))
            print bb[1000:1020]
            #print 'first 10 mp3samples', b[:10]
            #print b[:10] * (1.0 / 2**15)
            printed = True
        n = len(b)
        assert not (n%2)
        yield scale*numpy.asarray(b, dtype=dtype).reshape((n/2, 2)) #cast and reshape

class AudioRead(theano.Op):
    #TODO: add the samplerate as an output
    """Read an mp3 (other formats not implemented yet)

    Depends on 'madplay' being on system path.

    input - filename
    output - the contents of the audiofile in pcm format
    
    """
    def __init__(self, channels=2, sr=22050, dtype=theano.config.floatX):
        """
        :param channels: output this many channels
        :param sr: output will be encoded at this samplerate
        :param dtype: output will have this dtype
        """
        self.dtype = dtype
        if dtype not in ('float32', 'float64', 'int16'):
            raise NotImplementedError('dtype', dtype)
        self.channels = channels
        self.sr = sr

    def __eq__(self, other):
        return (type(self) == type(other)) and self.dtype == other.dtype \
                and self.channels == other.channels and self.sr == other.sr

    def __hash__(self):
        return hash(type(self)) ^ hash(self.dtype) ^ hash(self.channels) ^ hash(self.sr)

    def make_node(self, path):
        bcast = (False,) *self.channels
        otype = theano.tensor.TensorType(broadcastable=bcast, dtype=self.dtype)
        return theano.Apply(self, [path], [otype(),])

    def perform(self, node, (path,), (data_storage, )):
        if path.upper().endswith('.MP3'):
            cmd = ['madplay']
            cmd.extend(['--sample-rate', str(self.sr)])
            cmd.extend(['-o', 'raw:/dev/stdout'])
            cmd.extend(['-d',])
            if self.channels==1:
                cmd.extend(['--mono'])
            elif self.channels==2:
                cmd.extend(['--stereo'])
            else:
                raise NotImplementedError("weird number of channels", self.channels)
            cmd.append(path)

            proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
            proc_stdout, proc_stderr = proc.communicate()
            assert proc.returncode is not None # process should be finished
            if proc.returncode:
                print >> sys.stderr, proc_stderr
                raise Exception('cmd %s returned code %i'%(' '.join(cmd),proc.returncode))

            int16samples= numpy.frombuffer(proc_stdout, dtype='int16')
            if self.dtype == 'float32':
                typedsamples = numpy.asarray(int16samples, dtype='float32') / numpy.float32(2**15)
            elif self.dtype == 'float64':
                typedsamples = int16samples * (1.0/2**15)
            elif self.dtype == 'int16':
                typedsamples = int16samples
            else:
                raise NotImplementedError()

            if self.channels==2:
                typedsamples = typedsamples.reshape((len(typedsamples)/2,2))
        else: 
            #TODO: if extension is .wav use the 'wave' module in the stdlib
            #      see test_audioread below for usage
            raise NotImplementedError()

        assert typedsamples.dtype == self.dtype
        assert len(typedsamples.shape) == self.channels, (typedsamples.shape, self.channels)
        data_storage[0] = typedsamples

    def grad(self, inputs, g_output):
        return [None for i in inputs]


def test_audioread():
    #
    # Not really a unit test because it depends on files that are probably not around anymore.
    # Still, the basic idea is to decode externally, and compare with wavread.
    #

    mp3path = "/home/bergstra/data/majorminer/mp3/Mono/Formica Blues/03 Slimcea Girl_003.20_003.30.mp3"

    dstorage = [None]
    AudioRead(channels=1, dtype='float32', sr=44100).perform(None, (mp3path,), (dstorage, ))
    mp3samples = dstorage[0]

    wavpath = "/home/bergstra/tmp/blah2.wav"
    import wave, numpy
    wavfile = wave.open(wavpath)
    assert wavfile.getsampwidth()==2 # bytes
    wavsamples = numpy.frombuffer(
            wavfile.readframes(wavfile.getnframes()),
            dtype='int16')
    wavsamples = wavsamples.reshape((wavfile.getnframes(), wavfile.getnchannels()))
    wavsamples_as_float = numpy.asarray(wavsamples, dtype='float32') / 2**15

    print 'wavsamples 1000:1020:', wavsamples[1000:1020].mean(axis=1)
    print 'mp3samples 1000:1020:', mp3samples[1000:1020]*2**15
    print 'wavsample range', wavsamples.min(), wavsamples.max()
    print 'mp3sample range', mp3samples.min(), mp3samples.max()

    print mp3samples.shape, mp3samples.dtype
    print wavsamples.shape, wavsamples.dtype

    #assert mp3samples.shape == wavsamples.shape
    #assert mp3samples.dtype == wavsamples_as_float.dtype

    #print wavsamples_as_float[:5]
    #print mp3samples[:5]



if 0: ### OLD CODE USING PYGMY
    import pygmy.audio

    class AudioRead(theano.Op):
        #TODO: add the samplerate as an output
        """Read a wave file or mp3

        input - filename
        output - the contents of the audiofile in pcm format, and the samplerate
        
        """

        #arguments to pygmy.audio.audioread
        _audioread_attrs=('mono', 'tlast', 'fs_target', 'stripzeros', 'stats_only', 'decoder')

        mono = False
        tlast=-1
        fs_target=-1
        stripzeros='none'
        stats_only=False
        decoder = 'madplay'

        def __init__(self, **kwargs):
            for kw in kwargs:
                if not kw in self._audioread_attrs:
                    raise TypeError('unrecognized keyword argument', kw)
                setattr(self, kw, kwargs[kw])
        def __eq__(self, other):
            return (type(self) == type(other)) and \
                    all(getattr(self, a) == getattr(other,a) for a in self._audioread_attrs)
        def __hash__(self):
            return reduce(
                    lambda a,b: a^b,
                    [getattr(self, a) for a in self._audioread_attrs],
                    initial=hash(type(self)))
        def make_node(self, path):
            out_type = theano.tensor.dvector if self.mono else theano.tensor.dmatrix
            return theano.Apply(self, [path], [out_type(), theano.tensor.dscalar()])

        def perform(self, node, (path,), (data_storage, sr_storage)):
            data, sr, dz = pygmy.audio.audioread(path, 
                    mono=self.mono, 
                    tlast=self.tlast, 
                    fs_target=self.fs_target,
                    stripzeros=self.stripzeros,
                    stats_only=self.stats_only,
                    decoder=self.decoder)

            assert isinstance(data, numpy.ndarray)
            assert data.ndim == (1 if self.mono else 2)
            assert data.dtype == numpy.float64
            data_storage[0] = data

            sr_storage[0] = numpy.asarray(sr,dtype='float64')
            assert sr_storage[0].ndim==0

        def grad(self, inputs, g_output):
            return [None for i in inputs]

    audioread = AudioRead()
    audioread_mono = AudioRead(mono=True)