Source code and documentation
The contents of this notebook are © Johannes Ballé. It is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License. If you build on it, please cite our paper at ICIP 2014:
Johannes Ballé and Eero P. Simoncelli:
Learning sparse filterbank transforms with convolutional ICA
2014 21st IEEE International Conference on Image Processing (ICIP)
To be published.
To run this notebook, you will need:
The first two will be included in a typical installation of the Anaconda IPython environment (http://continuum.io). We tested the code on Anaconda 2.0.1, which includes Python 2.7.8, IPython 2.1.0, and numba 0.13.2.
The nbhelpers module is available on my blog (http://balle.io) and is also included in the zip file along with the notebook. It should be placed in the Python path or in the IPython working directory. To optionally generate the videos, a recent version of ffmpeg (https://www.ffmpeg.org) needs to be installed and accessible from the command line; to display the videos, your browser should support HTML 5.
Additionally, you need a data source, such as a set of images, to run the algorithm on. In the paper and below, we have used catalog “CD02A” of the UPenn Natural Image Database (http://tofu.psych.upenn.edu/~upennidb/). You may need to adjust the path to the files in the code below.
This section loads the required Python modules and defines two helper functions, as well as the main class used for all the experiments.
%pylab inline
from nbhelpers import *
import numba
# nonlinearity
def nl_tanh( x ):
'''return hyperbolic tangent and its derivative'''
t = numpy.tanh( x )
return t, 1 - t * t
# helper to fetch data faster
@numba.jit( numba.void( numba.float32[ :, :, :, : ],
numba.float32[ :, :, : ],
numba.int64[ : ],
numba.int64[ : ] ) )
def _get_patches( X, data, idx_y, idx_x ):
for k in range( X.shape[ 0 ] ):
for l in range( X.shape[ 1 ] ):
for y in range( X.shape[ 2 ] ):
for x in range( X.shape[ 3 ] ):
X[ k, l, y, x ] = data[ l, idx_y[ k ] + y, idx_x[ k ] + x ]
def visualize( Ws, spatial = True, spectral = False, normalize = 'row',
repeats = 1, fill = 255, aspect = 1.6, max_x = 1200 ):
'''helper function to visualize the filter arrays'''
from numpy import empty, abs, uint8
from numpy.fft import fft2, fftshift
def prepare( M, **kwargs ):
if normalize == 'elm':
N = empty( M.shape, dtype = uint8 )
for i in range( M.shape[ 0 ] ):
for j in range( M.shape[ 1 ] ):
N[ i, j ] = rerange( M[ i, j ], **kwargs )
elif normalize == 'row':
N = empty( M.shape, dtype = uint8 )
for i in range( M.shape[ 0 ] ):
N[ i, : ] = rerange( M[ i, : ], **kwargs )
elif normalize == 'col':
N = empty( M.shape, dtype = uint8 )
for j in range( M.shape[ 1 ] ):
N[ :, j ] = rerange( M[ :, j ], **kwargs )
elif normalize == 'all':
N = rerange( M, **kwargs )
else:
N = M
N = zoom( N, repeats = repeats )
return N
collected = []
for W in Ws:
if spatial:
collected.append( prepare( W, vsym = True ) )
if spectral:
W_ = fftshift( abs( fft2( W ) ), axes = ( -2, -1 ) )
collected.append( prepare( W_, vmin = 0 ) )
space = 4 if len( collected ) > 1 else 1
# arrange different visualizations horizontally
collected = padded_concatenate( collected, axis = -1, padding = 1, fill = fill )
if max_x is not None:
max_x = ( max_x + space ) // ( collected.shape[ -1 ] + space )
collected = dimwrap( collected, axis = 0, aspect = aspect, fill = fill, max_x = max_x )
collected = padded_concatenate( collected, axis = -2, padding = space, fill = fill )
collected = padded_concatenate( collected, axis = -1, padding = space, fill = fill )
return collected
# main class for all experiments
class CICA( object ):
'''Implementation of convolutional and block ICA'''
def __init__( self, **parameters ):
'''Initialize a new experiment.
Parameters:
shape: 3-tuple of integers
defines the number of data channels,
and the height and width of the filter support
circulant: bool
if false, runs block ICA
if true, runs convolutional ICA
g: callable
implementation of pointwise nonlinearity,
should return the nonlinearly transformed values
as well as their derivative at that point
sampling: 2-tuple of integers
subsampling factor in each dimension
K: integer
number of unique filters
data: array-like
contains the images as a four-dimensional data structure
first dimension: number of image
second dimension: input channels (such as for RGB data)
third and fourth dimension: spatial coordinates
nsamples: integer
number of image patches to use for each iteration
seed: integer
random seed for initialization
'''
self.__dict__.update( parameters )
self.mu = self.estimate_mean()
self.C = self.estimate_covariance( self.nsamples )
self.sqrtC, self.Q, self.sqrtQ, _, _, _, _ = self.invert( self.C )
self.W = [ self.initialize_filters( self.K ) ]
def __call__( self, iterations, nboutput = False ):
'''Run the algorithm.
Parameters:
iterations: integer
the number of iterations to perform
nboutput: bool
whether or not to display progress in the notebook
'''
from IPython.display import clear_output
try:
for it in range( iterations ):
if nboutput:
clear_output()
print "after {} iterations:".format( len( self.W ) - 1 )
nbimage( visualize( [ self.ufilters() ], spectral = True ) )
self.W.append( self.fastica_step( self.W[ -1 ], self.nsamples, nboutput = nboutput ) )
except KeyboardInterrupt:
pass
finally:
clear_output()
def initialize_filters( self, K ):
'''initialize filters to windowed random noise'''
from scipy.signal import kaiser
from numpy.random import standard_normal
from numpy.fft import ifft2
from numpy.random import seed
seed( self.seed )
window = numpy.outer( kaiser( self.shape[ -2 ], beta = 6 ), kaiser( self.shape[ -1 ], beta = 6 ) )
U = window * standard_normal( size = ( K, ) + self.shape )
if self.circulant:
U = self.circulant_renorm( U, shift = False )
else:
U = self.renorm( U )
W = numpy.dot( U.reshape( ( K, -1 ) ), self.sqrtQ ).reshape( U.shape )
return W
def get_patches( self, pshape = None, bunches = 1, bunchsize = 1000 ):
'''fetch a number of windowed samples from the data and return them in a vector'''
from numpy.random import randint
if pshape is None:
pshape = self.shape
X = numpy.empty( ( bunches, bunchsize ) + pshape, dtype = numpy.float32 )
idx_d = randint( 0, len( self.data ), bunches )
for d, x in zip( idx_d, X ):
data = self.data[ d ]
assert data.shape[ -3 ] == x.shape[ -3 ]
idx_y = randint( 0, data.shape[ -2 ] - x.shape[ -2 ] + 1, bunchsize )
idx_x = randint( 0, data.shape[ -1 ] - x.shape[ -1 ] + 1, bunchsize )
_get_patches( x, data, idx_y, idx_x )
return X.reshape( ( bunches * bunchsize, ) + pshape )
def estimate_mean( self ):
mu = [ d.sum( axis = ( -2, -1 ), keepdims = True ) for d in self.data ]
return numpy.sum( mu, axis = 0 ) / numpy.sum( [ d[ 0 ].size for d in self.data ] )
def estimate_covariance( self, nsamples ):
C = numpy.zeros( ( prod( self.shape ), prod( self.shape ) ) )
count = 0
while count < nsamples:
X = self.get_patches()
count += X.shape[ 0 ]
X -= self.mu
X = X.reshape( ( X.shape[ 0 ], -1 ) )
C += numpy.dot( X.T, X )
C /= count
return C
def invert( self, C, dimred = numpy.inf, soft = False ):
'''compute whitening and unwhitening matrices,
potentially performing a soft dimensionality reduction'''
from numpy.linalg import eigh
E, U = eigh( C )
order = numpy.argsort( E )[ ::-1 ]
E = E[ order ]
U = U[ :, order ]
assert numpy.allclose( C, numpy.dot( U * E, U.T ) )
E[ E < 0 ] = 0
if soft:
zE = ( E ** 2 ) / ( E + E[ 0 ] / dimred )
invE = 1 / ( E + E[ 0 ] / dimred )
else:
zE = E.copy()
zE[ E < E[ 0 ] / dimred ] = 0
invE = E ** -1
invE[ E < E[ 0 ] / dimred ] = 0
Q = numpy.dot( U * invE, U.T )
D = numpy.dot( U * numpy.sqrt( invE * zE ), U.T )
sqrtQ = numpy.dot( U * numpy.sqrt( invE ), U.T )
sqrtC = numpy.dot( U * numpy.sqrt( zE ), U.T )
return sqrtC, Q, sqrtQ, D, E, invE, zE
def renorm( self, U ):
'''enforce ICA constraint'''
from numpy.linalg import svd
R, _, S = svd( U.reshape( ( U.shape[ 0 ], -1 ) ), full_matrices = False )
U = numpy.dot( R, S ).reshape( U.shape )
return U
def circulant_renorm( self, U, shift = True ):
'''enforce CICA constraint and center filters'''
from numpy.fft import fft2, ifft2, ifftshift, fftfreq
from numpy.linalg import svd
sh = U.shape
sa = self.sampling
assert sh[ -2 ] % sa[ -2 ] == 0
assert sh[ -1 ] % sa[ -1 ] == 0
assert ( sa[ -2 ] % 2 == 0 and sa[ -1 ] % 2 == 0 ) or ( sa[ -2 ] == sa[ -1 ] == 1 )
U = fft2( U )
# enforce isometry
for fy in range( sh[ -2 ] // sa[ -2 ] ):
fys = slice( fy, fy + sh[ -2 ], sh[ -2 ] // sa[ -2 ] )
for fx in range( sh[ -1 ] // sa[ -1 ] ):
fxs = slice( fx, fx + sh[ -1 ], sh[ -1 ] // sa[ -1 ] )
B = U[ :, :, fys, fxs ]
R, _, S = svd( B.reshape( ( U.shape[ 0 ], -1 ) ), full_matrices = False )
U[ :, :, fys, fxs ] = numpy.dot( R, S ).reshape( B.shape )
U *= numpy.sqrt( sa[ -2 ] * sa[ -1 ] )
# discretely optimize for basis with minimum group delay
if shift:
fy = fftfreq( sh[ -2 ] )[ :, None ]
fx = fftfreq( sh[ -1 ] )[ None, : ]
fy2 = fftfreq( 2 * sh[ -2 ] )[ :, None ]
fx2 = fftfreq( 2 * sh[ -1 ] )[ None, : ]
U2 = fft2( ifftshift( ifft2( U ), axes = ( -2, -1 ) ), s = ( 2 * sh[ -2 ], 2 * sh[ -1 ] ) )
for k, u in enumerate( U2 ):
min_delay = inf
E = numpy.abs( u )
for dy in [ -sa[ -2 ], 0, sa[ -2 ] ]:
for dx in [ -sa[ -1 ], 0, sa[ -1 ] ]:
phi = numpy.angle( u * numpy.exp( ( 2j * numpy.pi ) * ( fy2 * dy + fx2 * dx ) ) )
grad_y = ( phi[ :, numpy.arange( 1, u.shape[ -2 ] + 1 ) % u.shape[ -2 ], : ]
- phi[ :, numpy.arange( -1, u.shape[ -2 ] - 1 ), : ] )
grad_x = ( phi[ :, :, numpy.arange( 1, u.shape[ -1 ] + 1 ) % u.shape[ -1 ] ]
- phi[ :, :, numpy.arange( -1, u.shape[ -1 ] - 1 ) ] )
delay = ( E * numpy.sqrt( grad_y ** 2 + grad_x ** 2 ) ).sum()
if delay < min_delay:
min_delay = delay
y = dy
x = dx
U[ k ] *= numpy.exp( ( 2j * numpy.pi ) * ( fy * y + fx * x ) )
U = ifft2( U ).real
return U
def fastica_step( self, W, nsamples, get_patches = None, nboutput = False ):
'''fixed-point iteration'''
if get_patches is None:
get_patches = self.get_patches
W = W.copy()
# vectorized view of W
W_ = W.view()
W_.shape = ( W.shape[ 0 ], numpy.prod( W.shape[ 1: ] ) )
U = numpy.empty_like( W )
# vectorized view of U
U_ = U.view()
U_.shape = W_.shape
# make sure power of y = Wx is one, to satisfy negentropy approximation
for w in W_:
w /= numpy.sqrt( numpy.dot( w, numpy.dot( self.C, w ) ) )
# first, accumulate the expectations by iterating over bunches of vectors
Exgwtx = numpy.zeros( W.shape )
Exgwtx_ = Exgwtx.view()
Exgwtx_.shape = W_.shape
Eg_wtx = numpy.zeros( W.shape[ :1 ] )
if nboutput:
print "collecting {:.1e} samples for fastICA iteration:".format( nsamples ),
prog = progressbar()
count = 0
while count < nsamples:
X = get_patches()
count += X.shape[ 0 ]
X -= self.mu
X.shape = X.shape[ 0 ], W_.shape[ 1 ]
Y = numpy.dot( W_, X.T )
gwtx, g_wtx = self.g( Y )
Exgwtx_ += numpy.dot( gwtx, X )
Eg_wtx += g_wtx.sum( axis = 1 )
if nboutput:
prog.update( count * 100. / nsamples, 1 )
# then do the actual FastICA step
W_ *= Eg_wtx[ :, None ]
W_ -= numpy.dot( Exgwtx_, self.Q )
# renorm filters
U_[ : ] = numpy.dot( W_, self.sqrtC )
for u in U_:
u /= numpy.sqrt( numpy.dot( u, u ) )
if self.circulant:
U[ : ] = self.circulant_renorm( U )
else:
U[ : ] = self.renorm( U )
W_[ : ] = dot( U_, self.sqrtQ )
return W
def filters( self, i = -1 ):
'''return the solution after i iterations of the algorithm (default last).
This returns the filters including the whitening.'''
return self.W[ i ]
def ufilters( self, i = -1 ):
'''return the solution after i iterations of the algorithm (default last).
This returns the filters without the whitening.'''
W = self.W[ i ]
return numpy.dot( W.reshape( ( W.shape[ 0 ], -1 ) ), self.sqrtC ).reshape( W.shape )
def bases( self, i = -1 ):
'''return the solution after i iterations of the algorithm (default last).
This returns the inverse filters (i.e. bases) including unwhitening.'''
W = self.W[ i ]
return numpy.dot( W.reshape( ( W.shape[ 0 ], -1 ) ), self.C ).reshape( W.shape )
This section instantiates a number of experiments (as objects of the class "CICA"). First, we load the images: the luminance version of the dataset is stored in the variable lum_data, and the RGB version in the variable rgb_data.
Here, we assume that the data is stored in the path
pictures/upenn/cd02Ain the current working directory. You might need to adjust this to your needs.
path = 'pictures/upenn/cd02A'
# load images from the UPenn Natural Image Database
# luminance version
from os import listdir
from scipy.io import loadmat
# get list of files
files = [ path + '/' + f for f in listdir( path ) if f.endswith( '_LUM.mat' ) ]
# load files and compress their dynamic range, as these are raw luminances
images = [ numpy.log10( 100 + loadmat( f )[ 'LUM_Image' ] ) for f in files ]
print "{} images, size {}".format( len( images ), images[ 0 ].shape )
lum_data = [ image.reshape( ( -1, ) + image.shape[ -2: ] ).astype( numpy.float32 ) for image in images ]
# show a "contact sheet" of the loaded images
nbimage( concatenate( concatenate(
dimwrap( rerange( [ d[ :, ::16, ::16 ] for d in lum_data ] ), aspect = 1.6, fill = 255 ),
axis = -2 ), axis = -1 ) )
# save some memory
del images
# load images from the UPenn Natural Image Database
# RGB version
from os import listdir
from scipy.io import loadmat
# get list of files
files = [ path + '/' + f for f in listdir( path ) if f.endswith( '_RGB.mat' ) ]
# load files and compress their dynamic range, as these are raw luminances
images = [ numpy.log10( 100 + loadmat( f )[ 'RGB_Image' ].transpose( ( 2, 0, 1 ) ) ) for f in files ]
print "{} images, size {}".format( len( images ), images[ 0 ].shape )
rgb_data = [ image.reshape( ( -1, ) + image.shape[ -2: ] ).astype( numpy.float32 ) for image in images ]
# show a "contact sheet" of the loaded images
nbimage( concatenate( concatenate(
dimwrap( rerange( [ d[ :, ::16, ::16 ] for d in rgb_data ] ), aspect = 1.6, fill = 255 ),
axis = -2 ), axis = -1 ) )
# save some memory
del images
We start with a basic example. Here, an object is created which stores all the variables necessary to perform the regular fastICA algorithm on the luminance dataset. This step may take a few seconds, because the object precomputes the mean and covariance of the data on initialization.
upenn_bloc_16 = CICA(
shape = ( 1, 16, 16 ), # 1 input channel (just luminances), filter support of 16x16
circulant = False, # block ICA
g = nl_tanh, # hyperbolic tangent nonlinearity
sampling = ( 1, 1 ), # no subsampling
K = 256, # 16 x 16 = 256 filters, hence a complete transform
data = lum_data, # the data set
nsamples = 1e6, # the number of samples used for each iteration
seed = 1234, # seed for the random number generator
)
Next, we run the algorithm. We do 120 iterations here, and use some IPython notebook features to show the current progress. In each iteration, upenn_bloc_16.W is extended by the current solution. Thus, at the end of the call, it contains the time course of the solutions as the algorithm proceeded.
upenn_bloc_16( 120, True )
Here, we show a visualization of the solution, similar to how it is shown in the paper. Specifically, what we show are the filters without the whitening (which we called U in the paper).
The visualize function takes the filter arrays as input and outputs a rescaled and rearranged version of it for display. It also computes the magnitude spectra of the filters and displays them side by side with the filters. This output is passed to the nbimage function, which simply displays it as an inline image in the notebook.
The output is generated as follows. Each filter array is rescaled, such that mid-gray corresponds to zero in the filter array, and such that its range is contained within the dynamic range of the display. This is used for every other column of the image you see here. The other columns are generated by computing the discrete Fourier transform of the filter array, taking its magnitude, and rescaling it such that black corresponds to zero in the magnitude, and such that its range is contained within the dynamic range of the display.
Each filter and its corresponding Fourier magnitude are separated by a one pixel wide line, whereas the different filters are separated by a four pixel wide line.
nbimage( visualize( [ upenn_bloc_16.ufilters() ], repeats = 2, spectral = True ) )
Now, we run our version of the algorithm by setting circulant to True.
We also reduce the number of filters to K = 16, since each filter is now convolutional. Hence, the resulting transform will be overcomplete by a factor of 16. Note that the algorithm runs much faster due to the decreased number of parameters.
upenn_circ_16 = CICA(
shape = ( 1, 16, 16 ),
circulant = True,
g = nl_tanh,
sampling = ( 1, 1 ),
K = 16,
data = lum_data,
nsamples = 1e6,
seed = 1234,
)
upenn_circ_16( 120, True )
The visualization method here is the same as above. In CICA, the filter is constrained such that, if applied in a shift-invariant way across the data, they together represent the data. Thus, the structure is now a filter bank.
nbimage( visualize( [ upenn_circ_16.ufilters() ], repeats = 2, spectral = True ) )
The statement below will generate an in-browser video which shows how the solution proceeds from initialization to convergence. Note that, for this to work, a recent version of ffmpeg needs to be available on the command line.
nbvideo( [
visualize( [ upenn_circ_16.ufilters( i ) ], repeats = 2, spectral = True )
for i in range( len( upenn_circ_16.W ) ) ], fps = 10, loop = True, counter = 'lr' )
Now, we repeat the experiment, but setting a subsampling factor of 4x4. Since each subband will only retain one sample out of 16, and there are a total of 16 filters, it will be a complete transform overall.
upenn_circ_16_4 = CICA(
shape = ( 1, 16, 16 ),
circulant = True,
g = nl_tanh,
sampling = ( 4, 4 ),
K = 16,
data = lum_data,
nsamples = 1e6,
seed = 1234,
)
upenn_circ_16_4( 120, True )
nbimage( visualize( [ upenn_circ_16_4.ufilters() ], repeats = 2, spectral = True ) )
Here, we repeat the experiment with RGB data. Now, there are three input channels. We still apply a subsampling of 4x4 on every subband, thus we would need 3 x 16 = 48 filters to make the transform complete. To show an intermediate case, we give it 96 filters, and hence the transform will be overcomplete by a factor of 2.
upennrgb_circ_16_4 = CICA(
shape = ( 3, 16, 16 ), # now we have 3 input channels
circulant = True,
g = nl_tanh,
sampling = ( 4, 4 ), # subsampling of 4x4 and 96 filters
K = 96, # the overall transform will be overcomplete by a factor of 2.
data = rgb_data,
nsamples = 1e6,
seed = 1234,
)
upennrgb_circ_16_4( 180, True )
nbimage( visualize( [ upennrgb_circ_16_4.ufilters() ], repeats = 2, spectral = True ) )