Learning sparse filterbank transforms with convolutional ICA

Source code and documentation

Table of Contents

Citation

The contents of this notebook are © Johannes Ballé. It is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License. If you build on it, please cite our paper at ICIP 2014:

Johannes Ballé and Eero P. Simoncelli:
Learning sparse filterbank transforms with convolutional ICA
2014 21st IEEE International Conference on Image Processing (ICIP)
To be published.

Getting started

To run this notebook, you will need:

The first two will be included in a typical installation of the Anaconda IPython environment (http://continuum.io). We tested the code on Anaconda 2.0.1, which includes Python 2.7.8, IPython 2.1.0, and numba 0.13.2.

The nbhelpers module is available on my blog (http://balle.io) and is also included in the zip file along with the notebook. It should be placed in the Python path or in the IPython working directory. To optionally generate the videos, a recent version of ffmpeg (https://www.ffmpeg.org) needs to be installed and accessible from the command line; to display the videos, your browser should support HTML 5.

Additionally, you need a data source, such as a set of images, to run the algorithm on. In the paper and below, we have used catalog “CD02A” of the UPenn Natural Image Database (http://tofu.psych.upenn.edu/~upennidb/). You may need to adjust the path to the files in the code below.

Source code & setup

This section loads the required Python modules and defines two helper functions, as well as the main class used for all the experiments.

In [1]:
%pylab inline
from nbhelpers import *
import numba
Populating the interactive namespace from numpy and matplotlib

In [2]:
# nonlinearity
def nl_tanh( x ):
    '''return hyperbolic tangent and its derivative'''
    t = numpy.tanh( x )
    return t, 1 - t * t

# helper to fetch data faster
@numba.jit( numba.void( numba.float32[ :, :, :, : ],
                        numba.float32[ :, :, : ],
                        numba.int64[ : ],
                        numba.int64[ : ] ) )
def _get_patches( X, data, idx_y, idx_x ):
    for k in range( X.shape[ 0 ] ):
        for l in range( X.shape[ 1 ] ):
            for y in range( X.shape[ 2 ] ):
                for x in range( X.shape[ 3 ] ):
                    X[ k, l, y, x ] = data[ l, idx_y[ k ] + y, idx_x[ k ] + x ]

def visualize( Ws, spatial = True, spectral = False, normalize = 'row',
               repeats = 1, fill = 255, aspect = 1.6, max_x = 1200 ):
    '''helper function to visualize the filter arrays'''
    
    from numpy import empty, abs, uint8
    from numpy.fft import fft2, fftshift

    def prepare( M, **kwargs ):
        if normalize == 'elm':
            N = empty( M.shape, dtype = uint8 )
            for i in range( M.shape[ 0 ] ):
                for j in range( M.shape[ 1 ] ):
                    N[ i, j ] = rerange( M[ i, j ], **kwargs )
        elif normalize == 'row':
            N = empty( M.shape, dtype = uint8 )
            for i in range( M.shape[ 0 ] ):
                N[ i, : ] = rerange( M[ i, : ], **kwargs )
        elif normalize == 'col':
            N = empty( M.shape, dtype = uint8 )
            for j in range( M.shape[ 1 ] ):
                N[ :, j ] = rerange( M[ :, j ], **kwargs )
        elif normalize == 'all':
            N = rerange( M, **kwargs )
        else:
            N = M
        N = zoom( N, repeats = repeats )
        return N

    collected = []
    for W in Ws:
        if spatial:
            collected.append( prepare( W, vsym = True ) )
        if spectral:
            W_ = fftshift( abs( fft2( W ) ), axes = ( -2, -1 ) )
            collected.append( prepare( W_, vmin = 0 ) )

    space = 4 if len( collected ) > 1 else 1

    # arrange different visualizations horizontally
    collected = padded_concatenate( collected, axis = -1, padding = 1, fill = fill )

    if max_x is not None:
        max_x = ( max_x + space ) // ( collected.shape[ -1 ] + space )

    collected = dimwrap( collected, axis = 0, aspect = aspect, fill = fill, max_x = max_x )
    collected = padded_concatenate( collected, axis = -2, padding = space, fill = fill )
    collected = padded_concatenate( collected, axis = -1, padding = space, fill = fill )

    return collected
In [3]:
# main class for all experiments

class CICA( object ):
    '''Implementation of convolutional and block ICA'''
    
    def __init__( self, **parameters ):
        '''Initialize a new experiment.
        
        Parameters:
        
        shape: 3-tuple of integers
            defines the number of data channels,
            and the height and width of the filter support
        circulant: bool
            if false, runs block ICA
            if true, runs convolutional ICA
        g: callable
            implementation of pointwise nonlinearity,
            should return the nonlinearly transformed values
            as well as their derivative at that point
        sampling: 2-tuple of integers
            subsampling factor in each dimension
        K: integer
            number of unique filters
        data: array-like
            contains the images as a four-dimensional data structure
            first dimension: number of image
            second dimension: input channels (such as for RGB data)
            third and fourth dimension: spatial coordinates
        nsamples: integer
            number of image patches to use for each iteration
        seed: integer
            random seed for initialization
        '''
        
        self.__dict__.update( parameters )
        
        self.mu = self.estimate_mean()
        self.C = self.estimate_covariance( self.nsamples )
        self.sqrtC, self.Q, self.sqrtQ, _, _, _, _ = self.invert( self.C )
        self.W = [ self.initialize_filters( self.K ) ]
        
    def __call__( self, iterations, nboutput = False ):
        '''Run the algorithm.
        
        Parameters:
        
        iterations: integer
            the number of iterations to perform
        nboutput: bool
            whether or not to display progress in the notebook
            
        '''
        from IPython.display import clear_output
        
        try:
            for it in range( iterations ):
                if nboutput:
                    clear_output()
                    print "after {} iterations:".format( len( self.W ) - 1 )
                    nbimage( visualize( [ self.ufilters() ], spectral = True ) )
                self.W.append( self.fastica_step( self.W[ -1 ], self.nsamples, nboutput = nboutput ) )
        except KeyboardInterrupt:
            pass
        finally:
            clear_output()
        
    def initialize_filters( self, K ):
        '''initialize filters to windowed random noise'''
        
        from scipy.signal import kaiser
        from numpy.random import standard_normal
        from numpy.fft import ifft2
        from numpy.random import seed
        
        seed( self.seed )
        
        window = numpy.outer( kaiser( self.shape[ -2 ], beta = 6 ), kaiser( self.shape[ -1 ], beta = 6 ) )
        U = window * standard_normal( size = ( K, ) + self.shape )
        if self.circulant:
            U = self.circulant_renorm( U, shift = False )
        else:
            U = self.renorm( U )
        W = numpy.dot( U.reshape( ( K, -1 ) ), self.sqrtQ ).reshape( U.shape )
        
        return W
    
    def get_patches( self, pshape = None, bunches = 1, bunchsize = 1000 ):
        '''fetch a number of windowed samples from the data and return them in a vector'''
        from numpy.random import randint
        if pshape is None:
            pshape = self.shape
        X = numpy.empty( ( bunches, bunchsize ) + pshape, dtype = numpy.float32 )
        idx_d = randint( 0, len( self.data ), bunches )
        for d, x in zip( idx_d, X ):
            data = self.data[ d ]
            assert data.shape[ -3 ] == x.shape[ -3 ]
            idx_y = randint( 0, data.shape[ -2 ] - x.shape[ -2 ] + 1, bunchsize )
            idx_x = randint( 0, data.shape[ -1 ] - x.shape[ -1 ] + 1, bunchsize )
            _get_patches( x, data, idx_y, idx_x )
        return X.reshape( ( bunches * bunchsize, ) + pshape )
    
    def estimate_mean( self ):
        mu = [ d.sum( axis = ( -2, -1 ), keepdims = True ) for d in self.data ]
        return numpy.sum( mu, axis = 0 ) / numpy.sum( [ d[ 0 ].size for d in self.data ] )
    
    def estimate_covariance( self, nsamples ):
        C = numpy.zeros( ( prod( self.shape ), prod( self.shape ) ) )
        count = 0
        while count < nsamples:
            X = self.get_patches()
            count += X.shape[ 0 ]
            X -= self.mu
            X = X.reshape( ( X.shape[ 0 ], -1 ) )
            C += numpy.dot( X.T, X )
        C /= count
        return C
    
    def invert( self, C, dimred = numpy.inf, soft = False ):
        '''compute whitening and unwhitening matrices,
           potentially performing a soft dimensionality reduction'''
        
        from numpy.linalg import eigh
        
        E, U = eigh( C )
        order = numpy.argsort( E )[ ::-1 ]
        E = E[ order ]
        U = U[ :, order ]
        assert numpy.allclose( C, numpy.dot( U * E, U.T ) )
        
        E[ E < 0 ] = 0
        
        if soft:
            zE = ( E ** 2 ) / ( E + E[ 0 ] / dimred )
            invE = 1 / ( E + E[ 0 ] / dimred )
        else:
            zE = E.copy()
            zE[ E < E[ 0 ] / dimred ] = 0
            invE = E ** -1
            invE[ E < E[ 0 ] / dimred ] = 0
        
        Q = numpy.dot( U * invE, U.T )
        D = numpy.dot( U * numpy.sqrt( invE * zE ), U.T )
        sqrtQ = numpy.dot( U * numpy.sqrt( invE ), U.T )
        sqrtC = numpy.dot( U * numpy.sqrt( zE ), U.T )
        
        return sqrtC, Q, sqrtQ, D, E, invE, zE

    def renorm( self, U ):
        '''enforce ICA constraint'''
        
        from numpy.linalg import svd
        
        R, _, S = svd( U.reshape( ( U.shape[ 0 ], -1 ) ), full_matrices = False )
        U = numpy.dot( R, S ).reshape( U.shape )
        
        return U
    
    def circulant_renorm( self, U, shift = True ):
        '''enforce CICA constraint and center filters'''
        
        from numpy.fft import fft2, ifft2, ifftshift, fftfreq
        from numpy.linalg import svd

        sh = U.shape
        sa = self.sampling
        
        assert sh[ -2 ] % sa[ -2 ] == 0
        assert sh[ -1 ] % sa[ -1 ] == 0
        assert ( sa[ -2 ] % 2 == 0 and sa[ -1 ] % 2 == 0 ) or ( sa[ -2 ] == sa[ -1 ] == 1 )
        
        U = fft2( U )
        
        # enforce isometry
        for fy in range( sh[ -2 ] // sa[ -2 ] ):
            fys = slice( fy, fy + sh[ -2 ], sh[ -2 ] // sa[ -2 ] )
            for fx in range( sh[ -1 ] // sa[ -1 ] ):
                fxs = slice( fx, fx + sh[ -1 ], sh[ -1 ] // sa[ -1 ] )
                B = U[ :, :, fys, fxs ]
                R, _, S = svd( B.reshape( ( U.shape[ 0 ], -1 ) ), full_matrices = False )
                U[ :, :, fys, fxs ] = numpy.dot( R, S ).reshape( B.shape )
        U *= numpy.sqrt( sa[ -2 ] * sa[ -1 ] )
        
        # discretely optimize for basis with minimum group delay
        if shift:
            fy = fftfreq( sh[ -2 ] )[ :, None ]
            fx = fftfreq( sh[ -1 ] )[ None, : ]
            fy2 = fftfreq( 2 * sh[ -2 ] )[ :, None ]
            fx2 = fftfreq( 2 * sh[ -1 ] )[ None, : ]
            U2 = fft2( ifftshift( ifft2( U ), axes = ( -2, -1 ) ), s = ( 2 * sh[ -2 ], 2 * sh[ -1 ] ) )
            for k, u in enumerate( U2 ):
                min_delay = inf
                E = numpy.abs( u )
                for dy in [ -sa[ -2 ], 0, sa[ -2 ] ]:
                    for dx in [ -sa[ -1 ], 0, sa[ -1 ] ]:
                        phi = numpy.angle( u * numpy.exp( ( 2j * numpy.pi ) * ( fy2 * dy + fx2 * dx ) ) )
                        grad_y = ( phi[ :, numpy.arange( 1, u.shape[ -2 ] + 1 ) % u.shape[ -2 ], : ]
                                 - phi[ :, numpy.arange( -1, u.shape[ -2 ] - 1 ), : ] )
                        grad_x = ( phi[ :, :, numpy.arange( 1, u.shape[ -1 ] + 1 ) % u.shape[ -1 ] ]
                                 - phi[ :, :, numpy.arange( -1, u.shape[ -1 ] - 1 ) ] )
                        delay = ( E * numpy.sqrt( grad_y ** 2 + grad_x ** 2 ) ).sum()
                        if delay < min_delay:
                            min_delay = delay
                            y = dy
                            x = dx
                U[ k ] *= numpy.exp( ( 2j * numpy.pi ) * ( fy * y + fx * x ) )
        
        U = ifft2( U ).real
        return U
    
    def fastica_step( self, W, nsamples, get_patches = None, nboutput = False ):
        '''fixed-point iteration'''
        
        if get_patches is None:
            get_patches = self.get_patches
            
        W = W.copy()
        # vectorized view of W
        W_ = W.view()
        W_.shape = ( W.shape[ 0 ], numpy.prod( W.shape[ 1: ] ) )

        U = numpy.empty_like( W )
        # vectorized view of U
        U_ = U.view()
        U_.shape = W_.shape
        
        # make sure power of y = Wx is one, to satisfy negentropy approximation
        for w in W_:
            w /= numpy.sqrt( numpy.dot( w, numpy.dot( self.C, w ) ) )
    
        # first, accumulate the expectations by iterating over bunches of vectors
        Exgwtx = numpy.zeros( W.shape )
        Exgwtx_ = Exgwtx.view()
        Exgwtx_.shape = W_.shape
        
        Eg_wtx = numpy.zeros( W.shape[ :1 ] )
        
        if nboutput:
            print "collecting {:.1e} samples for fastICA iteration:".format( nsamples ),
            prog = progressbar()
        
        count = 0
        while count < nsamples:
            X = get_patches()
            count += X.shape[ 0 ]
            X -= self.mu
            X.shape = X.shape[ 0 ], W_.shape[ 1 ]
            Y = numpy.dot( W_, X.T )
            gwtx, g_wtx = self.g( Y )
            Exgwtx_ += numpy.dot( gwtx, X )
            Eg_wtx += g_wtx.sum( axis = 1 )
            if nboutput:
                prog.update( count * 100. / nsamples, 1 )
    
        # then do the actual FastICA step
        W_ *= Eg_wtx[ :, None ]
        W_ -= numpy.dot( Exgwtx_, self.Q )
        
        # renorm filters
        U_[ : ] = numpy.dot( W_, self.sqrtC )
        for u in U_:
            u /= numpy.sqrt( numpy.dot( u, u ) )
        if self.circulant:
            U[ : ] = self.circulant_renorm( U )
        else:
            U[ : ] = self.renorm( U )
        W_[ : ] = dot( U_, self.sqrtQ )
        
        return W
    
    def filters( self, i = -1 ):
        '''return the solution after i iterations of the algorithm (default last).
        
        This returns the filters including the whitening.'''
        return self.W[ i ]
    
    def ufilters( self, i = -1 ):
        '''return the solution after i iterations of the algorithm (default last).
        
        This returns the filters without the whitening.'''
        W = self.W[ i ]
        return numpy.dot( W.reshape( ( W.shape[ 0 ], -1 ) ), self.sqrtC ).reshape( W.shape )
    
    def bases( self, i = -1 ):
        '''return the solution after i iterations of the algorithm (default last).
        
        This returns the inverse filters (i.e. bases) including unwhitening.'''
        W = self.W[ i ]
        return numpy.dot( W.reshape( ( W.shape[ 0 ], -1 ) ), self.C ).reshape( W.shape )

Natural image experiments

This section instantiates a number of experiments (as objects of the class "CICA"). First, we load the images: the luminance version of the dataset is stored in the variable lum_data, and the RGB version in the variable rgb_data.

Here, we assume that the data is stored in the path

pictures/upenn/cd02A
in the current working directory. You might need to adjust this to your needs.

In [4]:
path = 'pictures/upenn/cd02A'
In [5]:
# load images from the UPenn Natural Image Database
# luminance version

from os import listdir
from scipy.io import loadmat

# get list of files
files = [ path + '/' + f for f in listdir( path ) if f.endswith( '_LUM.mat' ) ]

# load files and compress their dynamic range, as these are raw luminances
images = [ numpy.log10( 100 + loadmat( f )[ 'LUM_Image' ] ) for f in files ]
print "{} images, size {}".format( len( images ), images[ 0 ].shape )

lum_data = [ image.reshape( ( -1, ) + image.shape[ -2: ] ).astype( numpy.float32 ) for image in images ]

# show a "contact sheet" of the loaded images
nbimage( concatenate( concatenate(
    dimwrap( rerange( [ d[ :, ::16, ::16 ] for d in lum_data ] ), aspect = 1.6, fill = 255 ),
    axis = -2 ), axis = -1 ) )

# save some memory
del images
69 images, size (1007, 1519)

In [6]:
# load images from the UPenn Natural Image Database
# RGB version

from os import listdir
from scipy.io import loadmat

# get list of files
files = [ path + '/' + f for f in listdir( path ) if f.endswith( '_RGB.mat' ) ]

# load files and compress their dynamic range, as these are raw luminances
images = [ numpy.log10( 100 + loadmat( f )[ 'RGB_Image' ].transpose( ( 2, 0, 1 ) ) ) for f in files ]
print "{} images, size {}".format( len( images ), images[ 0 ].shape )

rgb_data = [ image.reshape( ( -1, ) + image.shape[ -2: ] ).astype( numpy.float32 ) for image in images ]

# show a "contact sheet" of the loaded images
nbimage( concatenate( concatenate(
    dimwrap( rerange( [ d[ :, ::16, ::16 ] for d in rgb_data ] ), aspect = 1.6, fill = 255 ),
    axis = -2 ), axis = -1 ) )

# save some memory
del images
69 images, size (3, 1007, 1519)

Block ICA on luminance images

We start with a basic example. Here, an object is created which stores all the variables necessary to perform the regular fastICA algorithm on the luminance dataset. This step may take a few seconds, because the object precomputes the mean and covariance of the data on initialization.

In [7]:
upenn_bloc_16 = CICA(
    shape = ( 1, 16, 16 ),       # 1 input channel (just luminances), filter support of 16x16
    circulant = False,           # block ICA
    g = nl_tanh,                 # hyperbolic tangent nonlinearity
    sampling = ( 1, 1 ),         # no subsampling
    K = 256,                     # 16 x 16 = 256 filters, hence a complete transform
    data = lum_data,             # the data set
    nsamples = 1e6,              # the number of samples used for each iteration
    seed = 1234,                 # seed for the random number generator
)

Next, we run the algorithm. We do 120 iterations here, and use some IPython notebook features to show the current progress. In each iteration, upenn_bloc_16.W is extended by the current solution. Thus, at the end of the call, it contains the time course of the solutions as the algorithm proceeded.

In [8]:
upenn_bloc_16( 120, True )


Here, we show a visualization of the solution, similar to how it is shown in the paper. Specifically, what we show are the filters without the whitening (which we called U in the paper).

The visualize function takes the filter arrays as input and outputs a rescaled and rearranged version of it for display. It also computes the magnitude spectra of the filters and displays them side by side with the filters. This output is passed to the nbimage function, which simply displays it as an inline image in the notebook.

The output is generated as follows. Each filter array is rescaled, such that mid-gray corresponds to zero in the filter array, and such that its range is contained within the dynamic range of the display. This is used for every other column of the image you see here. The other columns are generated by computing the discrete Fourier transform of the filter array, taking its magnitude, and rescaling it such that black corresponds to zero in the magnitude, and such that its range is contained within the dynamic range of the display.

Each filter and its corresponding Fourier magnitude are separated by a one pixel wide line, whereas the different filters are separated by a four pixel wide line.

In [9]:
nbimage( visualize( [ upenn_bloc_16.ufilters() ], repeats = 2, spectral = True ) )

Convolutional ICA on luminance images

Overcomplete CICA, K = 16

Now, we run our version of the algorithm by setting circulant to True.

We also reduce the number of filters to K = 16, since each filter is now convolutional. Hence, the resulting transform will be overcomplete by a factor of 16. Note that the algorithm runs much faster due to the decreased number of parameters.

In [10]:
upenn_circ_16 = CICA(
    shape = ( 1, 16, 16 ),
    circulant = True,
    g = nl_tanh,
    sampling = ( 1, 1 ),
    K = 16,
    data = lum_data,
    nsamples = 1e6,
    seed = 1234,
)
In [11]:
upenn_circ_16( 120, True )


The visualization method here is the same as above. In CICA, the filter is constrained such that, if applied in a shift-invariant way across the data, they together represent the data. Thus, the structure is now a filter bank.

In [12]:
nbimage( visualize( [ upenn_circ_16.ufilters() ], repeats = 2, spectral = True ) )

The statement below will generate an in-browser video which shows how the solution proceeds from initialization to convergence. Note that, for this to work, a recent version of ffmpeg needs to be available on the command line.

In [13]:
nbvideo( [
    visualize( [ upenn_circ_16.ufilters( i ) ], repeats = 2, spectral = True )
for i in range( len( upenn_circ_16.W ) ) ], fps = 10, loop = True, counter = 'lr' )

Complete CICA, K = 16, s = 4

Now, we repeat the experiment, but setting a subsampling factor of 4x4. Since each subband will only retain one sample out of 16, and there are a total of 16 filters, it will be a complete transform overall.

In [14]:
upenn_circ_16_4 = CICA(
    shape = ( 1, 16, 16 ),
    circulant = True,
    g = nl_tanh,
    sampling = ( 4, 4 ),
    K = 16,
    data = lum_data,
    nsamples = 1e6,
    seed = 1234,
)
In [15]:
upenn_circ_16_4( 120, True )


In [16]:
nbimage( visualize( [ upenn_circ_16_4.ufilters() ], repeats = 2, spectral = True ) )

Convolutional ICA on RGB images

Here, we repeat the experiment with RGB data. Now, there are three input channels. We still apply a subsampling of 4x4 on every subband, thus we would need 3 x 16 = 48 filters to make the transform complete. To show an intermediate case, we give it 96 filters, and hence the transform will be overcomplete by a factor of 2.

In [17]:
upennrgb_circ_16_4 = CICA(
    shape = ( 3, 16, 16 ),       # now we have 3 input channels
    circulant = True,
    g = nl_tanh,
    sampling = ( 4, 4 ),         # subsampling of 4x4 and 96 filters
    K = 96,                      # the overall transform will be overcomplete by a factor of 2.
    data = rgb_data,
    nsamples = 1e6,
    seed = 1234,
)
In [18]:
upennrgb_circ_16_4( 180, True )


In [19]:
nbimage( visualize( [ upennrgb_circ_16_4.ufilters() ], repeats = 2, spectral = True ) )