In [1]:
# Compressed sensing demo by Brian Busemeyer.
# Python 3
import matplotlib.pyplot as plt
import numpy as np
import scipy as sci
%matplotlib inline
import multiprocessing as mp
# These two libraries are available on my github https://github.com/bbusemeyer/busempyer
import plot_tools as pt
import mython as my
import compressed_sensing as cs
import PIL as pyim

In [2]:
# Here's a naive attempt to solve the problem.
fig,ax = plt.subplots(2,2,sharex='all',sharey='col')
signal_size = 100
frac_measurements = 0.4
signal_sparsity = 0.1

num_measurements = int(round(frac_measurements * signal_size))

# Generates a random sparse signal.
signal = cs.gen_rand_signal(signal_size,signal_sparsity)
ax[0,0].bar(range(len(signal)),signal)
#ax[0].set_ylim(-1.1,1.1)
ax[0,0].set_ylabel("Signal")

# The measurements are related by some change-of-basis matrix, which I take as random too.
basis_change = np.random.randn(num_measurements,signal_size)/signal_size

# This means the measurements are given by change of basis applied to signal.
measurements = np.dot(basis_change,signal)
ax[0,1].bar(range(len(measurements)),measurements)
ax[0,1].set_ylabel("Measurements")
#ax[0,1].annotate("(different basis)",
#              (0.6,0.7),xycoords='axes fraction')

estimate,res,rank,svs = np.linalg.lstsq(basis_change,measurements)
ax[1,0].bar(range(len(estimate)),estimate)
ax[1,0].set_ylabel("Estimate")

# Properties of the estimate.
print("Sum of residues of fit:",abs(np.dot(basis_change,estimate)-measurements).sum())

plt.delaxes(ax[1,1])

# Check how good we did.
l1err = np.linalg.norm(estimate-signal,1) / np.linalg.norm(signal,1)
l2err = np.linalg.norm(estimate-signal,2) / np.linalg.norm(signal,2)
print("L1 error: %.2f"%l1err)
print("L2 error: %.2f"%l2err)
#ax[1].annotate("MSE: %.2f;  MAE: %.2f"%(mse,mae),
#              (0.1,0.8),xycoords='axes fraction')
fig.set_size_inches(6,3)
fig.tight_layout()
fig.savefig("sample_l2.eps")

Sum of residues of fit: 1.43114686768e-15
L1 error: 1.92
L2 error: 0.83

In [3]:
# Now minimize l1 norm.
fig,ax = plt.subplots(2,2,sharex='all',sharey='col')
#signal_size = 100
#frac_measurements = 0.4
#signal_sparsity = 0.2

num_measurements = int(round(frac_measurements * signal_size))

# Generates a random sparse signal.
#signal = cs.gen_rand_signal(signal_size,signal_sparsity)
ax[0,0].bar(range(len(signal)),signal)
#ax[0].set_ylim(-1.1,1.1)
ax[0,0].set_ylabel("Signal")

# The measurements are related by some change-of-basis matrix, which I take as random too.
#basis_change = np.random.randn(num_measurements,signal_size)/signal_size

# This means the measurements are given by change of basis applied to signal.
measurements = np.dot(basis_change,signal)
ax[0,1].bar(range(len(measurements)),measurements)
ax[0,1].set_ylabel("Measurements")
#ax[0,1].annotate("(different basis)",
#              (0.6,0.7),xycoords='axes fraction')

coefs,ineq_mat,eq_mat,eq_vec = cs.setup_linprog(basis_change,measurements)

result = sci.optimize.linprog(
coefs,
ineq_mat,
np.zeros(2*signal_size),
eq_mat,
eq_vec,
bounds=[(-np.inf,np.inf) for var in eq_vec],
options={'maxiter':10000})
print(result['message'])
if result['message'] != 'Optimization terminated successfully.':
estimate = np.nan
else:
estimate = result['x'][:signal_size]
ax[1,0].bar(range(len(estimate)),estimate)
ax[1,0].set_ylabel("Estimate")

plt.delaxes(ax[1,1])

# Check how good we did.
l1err = np.linalg.norm(estimate-signal,1) / np.linalg.norm(signal,1)
l2err = np.linalg.norm(estimate-signal,2) / np.linalg.norm(signal,2)
print("L1 error: %.2f"%l1err)
print("L2 error: %.2f"%l2err)
#ax[1].annotate("MSE: %.2f;  MAE: %.2f"%(mse,mae),
#              (0.1,0.8),xycoords='axes fraction')
fig.set_size_inches(6,3)
fig.tight_layout()
fig.savefig("sample_l1.eps")

Optimization terminated successfully.
L1 error: 0.00
L2 error: 0.00

In [11]:
# Series of signal processing plots.
signal_size = 100
signal_sparsity = 0.5
frac_measurements = np.arange(0.1,1.0,0.2)
fig,ax = plt.subplots(2,frac_measurements.shape[0],sharex=True,sharey=True)
pmap = dict(zip(frac_measurements,range(frac_measurements.shape[0])))
errs = []
for frac in frac_measurements:
col = pmap[frac]
errs.append(cs.plot_linprog(signal_size,frac,signal_sparsity,ax.T[col]))
ax[0,col].set_title("%.1f"%frac)
ax[0,0].set_ylabel("Signal")
ax[1,0].set_ylabel("Estimate")
fig.set_size_inches(8,4)
fig.tight_layout()
fig.savefig("measure_scan.eps")

In [14]:
# Series of signal processing plots.
signal_size = 100
frac_measurement = 0.5
signal_sparisties = np.arange(0.1,1.0,0.2)
fig,ax = plt.subplots(2,signal_sparisties.shape[0],sharex=True,sharey=True)
pmap = dict(zip(signal_sparisties,range(signal_sparisties.shape[0])))
errs = []
for spar in signal_sparisties:
col = pmap[spar]
errs.append(cs.plot_linprog(signal_size,frac_measurement,spar,ax.T[col]))
ax[0,col].set_title("%s"%spar)
ax[0,0].set_ylabel("Signal")
ax[1,0].set_ylabel("Estimate")
fig.set_size_inches(8,4)
fig.tight_layout()
fig.savefig("sparsity_scan.eps")

In [15]:
# Map out the parameter space.
signal_size = 100
frac_measurements = np.arange(0.1,1.0,0.1,)
signal_sparsities = np.arange(0.1,1.0,0.1)
pspace = my.cross_prod([frac_measurements,signal_sparsities])
fm_grid,sp_grid = np.meshgrid(frac_measurements,signal_sparsities)
def wrapper(args):
return cs.test_leastsquares(signal_size,*args)
with mp.Pool(8) as pool:
results = np.array(pool.map(wrapper, pspace))
#plt.pcolormesh(fm_grid,sp_grid,results.reshape(fm_grid.shape))
fig,ax = plt.subplots(1,1)
im = ax.imshow(results.reshape(fm_grid.shape),
cmap='plasma',
interpolation='nearest',
origin='lower left',
aspect='equal',
vmax=1,vmin=0)
ax.set_xticks(range(frac_measurements.shape[0]))
ax.set_xticklabels(frac_measurements)
ax.set_ylabel("Sparsity")
ax.set_yticks(range(signal_sparsities.shape[0]))
ax.set_yticklabels(signal_sparsities)
ax.set_xlabel("Fraction of measurements")
ax.set_title("Error of least squares")
fig.colorbar(im)
fig.tight_layout()
fig.savefig("leastsq_pmap.eps")

In [16]:
# Map out the parameter space.
signal_size = 100
frac_measurements = np.arange(0.1,1.0,0.1)
signal_sparsities = np.arange(0.1,1.0,0.1)
pspace = my.cross_prod([frac_measurements,signal_sparsities])
fm_grid,sp_grid = np.meshgrid(frac_measurements,signal_sparsities)
def wrapper(args):
return cs.test_linprog(signal_size,*args)
with mp.Pool(8) as pool:
results = np.array(pool.map(wrapper, pspace))
#plt.pcolormesh(fm_grid,sp_grid,results.reshape(fm_grid.shape))
fig,ax = plt.subplots(1,1)
im = ax.imshow(results.reshape(fm_grid.shape),
cmap='plasma',
interpolation='nearest',
origin='lower left',
aspect='equal',
vmax=1,vmin=0)
ax.set_xticks(range(signal_sparsities.shape[0]))
ax.set_xticklabels(signal_sparsities)
ax.set_xlabel("Sparsity")
ax.set_yticks(range(frac_measurements.shape[0]))
ax.set_yticklabels(frac_measurements)
ax.set_ylabel("Fraction of measurements")
ax.set_title("Error of linear program")
fig.colorbar(im)
fig.savefig("linear_pmap.eps")

In [17]:
im = pyim.Image.open("cat.jpg").convert("L")
pmap = np.array(im,dtype=np.uint8)
plt.imshow(pmap,cmap='gray')
plt.savefig("cat_plotted.eps")

In [32]:
fmap = np.fft.rfft(pmap)
fig,ax = plt.subplots(1,2,sharex=True,sharey=True)
ax[0].imshow(fmap.real[1:,1:],vmin=-1000,vmax=1000)
ax[1].imshow(fmap.real[1:,1:],vmin=-1000,vmax=1000)
ax[0].set_title("Real")
ax[1].set_title("Imaginary")
fig.tight_layout()
plt.savefig("cat_fourier.png")