In this lab, we will interact with physical time-domain signals. The first part will involve generating and recording sounds on your computer. We will use the chirp signal to characterize the response of the speaker-microphone system and look at detecting signals using cross-correlation. In the second part, we will build on part one and use the computer sound system to develop a simple sonar.
# Import functions and libraries
from __future__ import division
import numpy as np, matplotlib.pyplot as plt, bokeh.plotting as bk
import threading,time, Queue, pyaudio
from matplotlib.pyplot import *
import matplotlib.cm as cm
from scipy import signal
from numpy import *
from bokeh.models import GlyphRenderer
from threading import Lock
bk.output_notebook()
%matplotlib inline
For this assignment you will have to use iPython, and a laptop equipped with a speaker and a microphone. When playing a sound and recording on the computer speaker, the signal goes through several systems. In particular it goes through the response of the speaker, the room we are in and the response of the microphone recording system.
A chirp is a a signal in which the frequency increases linearly with time. In this assignment we will generate a chirp signal and use it to measure the amplitude of the frequency response of our speaker-room-microphone system.
A simultaneous frequency is defined as the derivative of the phase of a signal, $f = \frac{1}{2\pi} \frac{d\phi (t)}{ dt} $. For example, the simultaneous frequency of $\cos(\phi(t))=\cos(2\pi f_0 t)$ is
$$f = \frac{d\phi (t)}{2\pi dt} = f_0$$For a linear chirp, the frequency changes linearly over time. The simultaneous frequency is therefore defined as
So,
Generate a 15 seconds long chirp signal, sampled at 44,100[Hz] with a frequency range of 20[Hz] to 20000[Hz]. Set the magnitude of the chirp to 0.5. This will help prevent non-linearities when we play the sound later.
fs = 44100
Hzt=0
to t=15
with sampling rate of 44100 Hzf_of_t
( $f(t)$ ) that changes linearly from 20Hz to 20Khz over 15 secondsphi_of_t
( $\phi(t) = 2\pi \int_0^t f(t)dt$ ) by numerically integrating f_of_t. You will find the function np.cumsum
useful.s_chirp
with amplitude of 0.5fs = 44100
# your code here
# generate time index
# generate f_of_t
# generate phi_of_t
# generate chirp signal
s_chirp
), you will notice that the carrier frequency increases and that the chirp has a constant envelope. To get a nice figure, make sure the aspect ratio of the figure is height/width = 0.2. Label the axis and figure appropriately. # Set the aspect ratio such that the image is wide
width, height = figaspect(0.2)
fig = figure(figsize=(width,height))
#Your code below:
signal.freqz
. Note, that the digital frequency range represents a physical frequency range of 0[hz] to 22050[Hz]. To get a nice figure, make sure the aspect ratio of the figure is height/width = 0.2. Label the axis and figure appropriately.# generate frequency response of chirp
# generate frequency index
# plot
width, height = plt.figaspect(0.2)
fig = plt.figure(figsize=(width,height))
Explain why the chirp is an appropriate signal to measure the magnitude frequency response of a system.
Now, we will play the sound of the chirp on our computer speaker and simultaneously record using the microphone.
On Apple computers it is recommended that you turn off the ambient noise reduction by going to system-preferences, selecting sound, choose the input tab and make sure that the "Use ambient noise reduction" box is unchecked. In some windows system there's ambient noise reduction as well. Make sure it is also turned off.
Your laptop most likely has two speakers. It is best if we work only with one. Go to the operating system's sound settings and change the stereo settings such that the speaker that is closest to the microphone is active. Your result will be much better that way.
Make sure your output volume is at 70-80% and that the laptop's microphone is on, again to avoid non-linear distorsions.
We will record 12 seconds just to make sure we capture the entire sequence.
The code below defines some functions to use with pyaudio -- a multi-platform audio python interface.
def play_audio( Q, p, fs , dev=None):
# play_audio plays audio with sampling rate = fs
# Q - A queue object from which to play
# p - pyAudio object
# fs - sampling rate
# dev - device number
# Example:
# fs = 44100
# p = pyaudio.PyAudio() #instantiate PyAudio
# Q = Queue.queue()
# Q.put(data)
# Q.put("EOT") # when function gets EOT it will quit
# play_audio( Q, p, fs,1 ) # play audio
# p.terminate() # terminate pyAudio
# open output stream
ostream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),output=True,output_device_index=dev)
# play audio
while (1):
data = Q.get()
if data is "EOT" :
break
try:
ostream.write( data.astype(np.float32).tostring() )
except:
break
def record_audio( queue, p, fs ,dev=None,chunk=1024,lock=None):
# record_audio records audio with sampling rate = fs
# queue - output data queue
# p - pyAudio object
# fs - sampling rate
# dev - device number
# chunk - chunks of samples at a time default 1024
#
# Example:
# fs = 44100
# Q = Queue.queue()
# p = pyaudio.PyAudio() #instantiate PyAudio
# record_audio( Q, p, fs, 1) #
# p.terminate() # terminate pyAudio
istream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),input=True,input_device_index=dev,frames_per_buffer=chunk)
# record audio in chunks and append to frames
frames = [];
while (1):
try: # when the pyaudio object is destroyed, stops
with lock if lock is not None else 1:
data_str = istream.read(chunk) # read a chunk of data
except:
break
data_flt = np.fromstring( data_str, 'float32' ) # convert string to float
queue.put( data_flt ) # append to list
def xciever(sig, fs):
# function takes a signal and a sampling frequency
# it then plays and records at the same time. The function returns
# the recorded sound.
rcv = [];
# create an input output FIFO queues
Qin = Queue.Queue()
Qout = Queue.Queue()
#lock for controlling access to shared resources
lock = Lock()
# create a pyaudio object
p = pyaudio.PyAudio()
# initialize a recording thread.
t_rec = threading.Thread(target = record_audio, args = (Qin, p, fs ), kwargs={'lock': lock})
t_play_audio = threading.Thread(target = play_audio, args = (Qout, p, fs ))
# start the recording and playing threads
t_rec.start()
t_play_audio.start()
Qout.put( sig );
Qout.put( "EOT" );
# pause for RECORD_SECS seconds
RECORD_SECS = len(sig)/fs + 2.0
time.sleep( RECORD_SECS )
# terminate pyAudio
with lock:
p.terminate()
# append to output
while ( not Qin.empty()) :
data = Qin.get()
rcv = np.append( rcv, data )
return rcv
Playing and recording audio:
The resulting received sequence will be stored in the variable rcv_chirp
.
## Play and record chirp at the same time
fs = 44100 # sampling rate = 44100 Hz
rcv_chirp = xciever( s_chirp, fs)
Label the figures and use an aspect ratio of Height/Width = 0.2
## Plot chirp response
# Your code below:
The absolute value of the of the result "sort of" displays the envelope, however it is still modulated by the (now rectified) frequency sweep carrier. If we write down the response, it can be expressed approximately as
$$y[n] = |H[n]| \sin(2\pi (f_0 +k[n*T])nT + \angle H[n])$$where $|H[n]|$ is the frequency response for the instantaneous frequency at the nth sample and $\angle H[n]$ is its phase response.
The reason that it is only an approximation is that there is an inherent assumption that we do not look at transient effects, only steady state effect for each frequency. This is a good approximation because our chirp is very slow compared to the propagation of sound in the room.
One way to get the envelope $|H[n]|$ is to convert it to its analytic signal. The analytic signal $x_a(t)$ of signal $x(t)$ is:
$$x_a = F^{-1}(F(x)\cdot 2U) = x + j y$$where $F$ is the Fourier transform, $U$ the unit step function, and $y$ the Hilbert transform of $x$. In other words, the negative half of the frequency spectrum is zeroed out, turning the real-valued signal into a complex signal. This is similar to the question in HW2!
The analytic signal of the received chirp will then be:
$$ y_a[n] = |H[n]|e^{j2\pi (f_0 +k[n*T])nT + \angle H[n]} $$The envelope can be detected by taking the magnitude.
Compute the analytic signal by using the function signal.hilbert
and plot its absolute value. Note that the discrete hilbert transform is not perfect, since it uses FIR filtering. This will show up as ripple in the envelope.
Label the figures and use an aspect ration of Height/Width = 0.2
## Your lovely code here:
In part II of the lab, we will be sending and receiving chirp pulses to estimate delays between the tranceived pulses. This is done by cross correlating / matched filtering the received signal with the known chirp pulse to detect the echoes. In this task, we will investigate the correlation properties of the chirp.
A cross correlation is defined as:
$$ R_{xy}[n] = \sum_{m=-\infty}^\infty x[m]y^*[m-n] = (x[m]*y^*[-m])[n]$$which is like a convolution, without flipping one of the signals. It can be implemented using a convolution as shown above. In general, the more correlated the two signals is at position $n$, the higher the value will be. That's why it is useful in a sonar system.
Because we will be doing cross-correlations between a chirp pulse and its echoes, it is useful to look at the auto-correlation, which is basically a cross correlation of the signal with itself. A discrete autocorrelation of a signal is defined as:
$$ R_{xx}[n] = \sum_{m=-\infty}^\infty x[m]x^*[m-n] = (x[m]*x^*[-m])[n]$$The chirp has a very nice property that its auto-correlation is very narrow. Since the spread of the resulting correlation determines how fast you can detect, the width of the auto-correlation is important. This property is called pulse compression and is widely considered in radar design. Random noise and some other pseudo-random like sequences also possess this property.
fs = 44100
t = r_[0.0:512]/fs
f0 = 17000.0
f1 = 18000.0
# your code here
# generate chirp signal
# generate frequency response of chirp
# plot
signal.convolve
or signal.fftconvolve
. Remember that you have to flip the signal since convolution does that already. You can flip a signal x
by doing x[::-1]
. Use mode=''full'' for convolution.## Your fantastic code here:
In a similar way as we did before, it is possible to recover the envelope of the autocorrelation by performing a cross-correlation with the analytic signal and then taking the absolute value. In this case, we know exactly what is the analytic function is!
s_chirp_a
, the analytic function of the chirp by computing: s_chirp_a = exp(1j* phi_of_t )
. Perform cross correlation between s_chirp_a
and s_chirp
and show the envelope. This is also called a matched filter. Use miliseconds as the x-axis
# your nice script to produce beautiful chirps, xcorralations and figures here:
Now we will look at why the chirp pulse is better for cross-correlation detection than a pure tone.
To easily zoom in to the main lobe, you can use bokeh to plot the autocorrelation:
fig = bk.figure( title = "autocorrelation",
y_axis_label = "Amplitude", x_axis_label = "t[ms]",
plot_height = 300, plot_width = 800)
fig.line( t, abs(acorr)/max(abs(acorr)), legend = 'autocorrelation', color = 'blue')
bk.show(fig)
where acorr
is the auto-correlation result and t
is the index.
# your solution here
Now, repeat tast III for
Compare the size of the main lobe (full width at half max) to the previous case of 15500Hz - 19500Hz, 512 in length.
# your solution here
As you can see, the chirp provides good pulse compression of the main-lobe. However, there exists very strong sidelobes. This is because the chirp is multiplied with a rect function, that is abrupt. Instead, we will window the chirp with one of the smooth window functions to taper off the sidelobes.
np.hanning
useful. What happened to the side-lobes? What happened to the main lobe? What's the tradeoff?# your solution here
In this part of the lab we will write a simple application that implements a sonar using the laptop internal speaker and microphone.
The basic idea is very simple and is the basis of sonar and ultrasound images -- Objects reflect sound waves. If we send a pulse of sound, we will get reflection echoes of that pulse. Detecting the echos and their time-of-flight will reveal their distance from the source, based on the speed of sound in air.
The way we are going to implement the sonar is to generate a series of rapid pulses, and use matched filtering to detect the source and the returning echos. There are many parameters in this lab that can be tweaked to get different results. We encourage you to experiment. We enjoyed very much making the lab and played quite a bit! We hope you enjoy it too.
Unfortunately, the quallity of the sonar system is going to be highly dependent on your laptop quallity, and the position of the speakers and microphone. It is recommended that you adjust the sound settings on your system such that only the speaker that is closer to the microphone is active. For example, MacBookAirs have the microphone on the side of the computer -- so you should set adjust the audio settings to left speaker only. Also, it is recommended that the speaker volume be set to half of its maximum to avoid non-linear distortions.
If you are getting poor results, please consult with us.
This lab was inspired from an iphone app called active-radar.
First, we will look at short chirp pulses with a pure frequency carrier. These are very common in radar and sonar systems. There are several considerations that we need to make. To be able to detect short distances, and also to be able to get good localization in time our pulses need to be very short. At the same time, shorter pulses carry less energy and have a wider bandwidth (why?). These reduce the signal to noise ratio and reduce our ability to detect targets. At the end, we will have to compromise in order to get satisfactory results in terms of detectibility and resolution.
We are going to design a pulsed sonar system in which we repeatedly send pulses and then listen to the returning echoes. The arrival time of the echos will correspond to double the distance from a target.
pulse = genChirpPulse(Npulse, f0, f1, fs)
The function will accept: Npulse
= number of samples, f0,f1
= starting and ending frequency and fs
= sampling frequency. The function will return the analytic function of the chirp $\exp (j 2\pi \int_0^t f(t)dt )$ with amplitude 1.
def genChirpPulse(Npulse, f0, f1, fs):
# Function generates an analytic function of a chirp pulse
# Inputs:
# Npulse - pulse length in samples
# f0 - starting frequency of chirp
# f1 - end frequency of chirp
# fs - sampling frequency
# Output:
# pulse - chirp pulse
pulse = genChirpPulse(200, 1000, 8000, 44100)
# your code here:
# plot
Generate Pulse Trains
Next, we will use the pulse generated by genChirpPulse
and generate a pulse train.
ptrain = genPulseTrain(pulse, Nrep, Nseg)
The function accepts pulse
= a pulse generated by genChirpPulse
, Nrep
= number of pulse repetitions and Nseg
= length of each pulse train segment (which is >= to the length of pulse
).The function returns ptrain
which is a vector of length Nrep
x Nseg
(Hint: use np.tile
)
def genPulseTrain(pulse, Nrep, Nseg):
# Funtion generates a pulse train from a pulse.
#Inputs:
# pulse = the pulse generated by genChirpPulse
# Nrep = number of pulse repetitions
# Nseg = Length of pulse segment >= length(pulse)
We now have components to generate pulses, generate a pulse train, play and record it. Lets see what we get! We will start with very short pulses with a single carrier frequency. Rectangular pulses are difficult for the speaker to produce as they exhibit discontinuities in the beginning and the end of the pulse. Therefore we will multiply the pulses with a smooth window. Here, we will use a hanning window.
fs = 44100
f0 = 15000
f1 = 15000
Npulse = 60
# your code here:
# your code here:
rcv = xciever(ptrain / 2, fs)
fig = bk.figure( title = "Tranceived pulse train",
y_axis_label = "Amplitude", x_axis_label = "Index",
plot_height = 300, plot_width = 800)
fig.line( r_[:len(ptrain):10], ptrain[::10], legend = 'transmited', color = 'orange')
fig.line( r_[:len(rcv):10], rcv[::10], legend = 'received', color = 'blue')
bk.show(fig)
Extract a single pulse from the received pulse train. You can find the pulse index either automatically using a threshold test followed by np.nonzero
or manually using the bokeh
plot cell above. Extract at least 2 Npulse samples before the pulse and 20 Npulse samples after using rcv_pulse = rcv[idx-2*Npulse:idx+Npulse*20]
Plot the received pulse. Can you see any echoes?
# your code here:
# find index of start pulse
# plot
The strong pulses we see are a result of direct feed-through from the transmitter to the receiver that does not scatter off targets. The echoes we see are a result of echoes from reflecting surfaces. The problem in our setup is that we don't know the exact delay between the transmitter and the receive hardware (in PyAudio). Instead, we will assume that the travel time for sound between the speaker and the microphone is negligible and much smaller than scatering targets. We can then detect when the pulses start based on the direct feedthrough signal.
We will detect both the feedthrough and echoes using matched filtering.
Xrcv = crossCorr( rcv, pulse_a )
to calculate the cross correlation (matched filter) of the received signal with the analytic function of the pulse. You can use signal.fftconvolve
Xrcv
to recover its envelope. Call the result Xrcv_a
.def crossCorr( rcv, pulse_a ):
# Funtion generates cross-correlation between rcv and pulse_a
# Inputs:
# rcv - received signal
# pulse_a - analytic pulse
# Output:
# Xrcv - cross-correlation between rcv and pulse_a
Xrcv_a = abs( crossCorr(rcv, pulse_a) )
fig = bk.figure( title = "Matched filtered signal",
y_axis_label = "Amplitude", x_axis_label = "Index",
plot_height = 300, plot_width = 800)
fig.line( r_[:len(Xrcv_a):10], Xrcv_a[::10], legend = 'received', color = 'blue')
bk.show(fig)
# find index of start pulse
# plot
In order to automate the system and visualize the results we need a few more components. To extract the pulses we need to know the position of the first feedthrough pulse.
idx = findDelay(Xrcv_a, Nseg)
that takes the result of the matched filtering and finds the index of the first feedthrough pulse. Try testing on the actual signal to check whether the function is correct. There are multiple ways of doing it. Nseg
is not required.def findDelay(Xrcv, Nseg):
# finds the first pulse
# Inputs:
# Xrcv - the received matched filtered signal
# Nseg - length of a segment
# Output:
# idx - index of the beginning of the first pulse
idx = findDelay(Xrcv_a,Nseg)
We now can correct for delays and detect echoes. The only thing left now is to convert the time between echoes into actual distance.
If we send a pulse of sound, we will get reflection echoes of that pulse. Detecting the echos and their time-of-flight will reveal their distance from the source, based on the speed of sound in air. The speed of sound in air is given by the following equation:
$$ v_s = 331.5\sqrt{1+T/273.15}~\mathrm{m/s}~,$$where T is the temperature in degree celcius.
t = dist2time( dist, temperature )
that takes in the distance to the target in cm and converts it into the time in seconds between the transmitted pulse and its echo. Remember the arrival time include the time to the target and back and therefore the time should be doubled.
For example, for temperature = 20 celcius and dist = 400 cm, the time it takes is 0.023 secs.def dist2time( dist, temperature):
# Converts distance in cm to time in secs
# Inputs:
# dist - distance to object in cm
# temperature - in celcius
# Output:
# t - time in seconds between transmitted pulse and echo
You now have a working sonar! It would be much easier though to play with different parameters if we automate things, so we created some wrappers for real-time plotting in a separate notebook (lab1-RealTime-Sonar).