In this part of the lab we will take the functions that we wrote in the previous part and put them in a streaming architecture such that we can collect finite buffers and proces them in real-time.
You will need the file ax25.py
%pylab
# Import functions and libraries
import numpy as np
import matplotlib.pyplot as plt
import pyaudio
import Queue
import threading,time
import sys
from numpy import pi
from numpy import sin
from numpy import zeros
from numpy import r_
from numpy import ones
from scipy import signal
from scipy import integrate
import threading,time
import multiprocessing
from numpy import mean
from numpy import power
from numpy.fft import fft
from numpy.fft import fftshift
from numpy.fft import ifft
from numpy.fft import ifftshift
import bitarray
from scipy.io.wavfile import read as wavread
import serial
import ax25
from fractions import gcd
%matplotlib inline
# function to compute least common multipler
def lcm(numbers):
return reduce(lambda x, y: (x*y)/gcd(x,y), numbers, 1)
def play_audio( Q,ctrlQ ,p, fs , dev, ser="", keydelay=0.1):
# play_audio plays audio with sampling rate = fs
# Q - A queue object from which to play
# ctrlQ - A queue object for ending the thread
# p - pyAudio object
# fs - sampling rate
# dev - device number
# ser - pyserial device to key the radio
# keydelay - delay after keying the radio
#
#
# There are two ways to end the thread:
# 1 - send "EOT" through the control queue. This is used to terminate the thread on demand
# 2 - send "EOT" through the data queue. This is used to terminate the thread when data is done.
#
# You can also key the radio either through the data queu and the control queue
# open output stream
ostream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),output=True,output_device_index=dev)
# play audio
while (1):
if not ctrlQ.empty():
# control queue
ctrlmd = ctrlQ.get()
if ctrlmd is "EOT" :
ostream.stop_stream()
ostream.close()
print("Closed play thread")
return;
elif (ctrlmd is "KEYOFF" and ser!=""):
ser.setDTR(0)
#print("keyoff\n")
elif (ctrlmd is "KEYON" and ser!=""):
ser.setDTR(1) # key PTT
#print("keyon\n")
time.sleep(keydelay) # wait 200ms (default) to let the power amp to ramp up
data = Q.get()
if (data is "EOT") :
ostream.stop_stream()
ostream.close()
print("Closed play thread")
return;
elif (data is "KEYOFF" and ser!=""):
ser.setDTR(0)
#print("keyoff\n")
elif (data is "KEYON" and ser!=""):
ser.setDTR(1) # key PTT
#print("keyon\n")
time.sleep(keydelay) # wait 200ms (default) to let the power amp to ramp up
else:
try:
ostream.write( data.astype(np.float32).tostring() )
except:
print("Exception")
break
def record_audio( queue,ctrlQ, p, fs ,dev,chunk=1024):
# record_audio records audio with sampling rate = fs
# queue - output data queue
# p - pyAudio object
# fs - sampling rate
# dev - device number
# chunk - chunks of samples at a time default 1024
#
# Example:
# fs = 44100
# Q = Queue.queue()
# p = pyaudio.PyAudio() #instantiate PyAudio
# record_audio( Q, p, fs, 1) #
# p.terminate() # terminate pyAudio
istream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),input=True,input_device_index=dev,frames_per_buffer=chunk)
# record audio in chunks and append to frames
frames = [];
while (1):
if not ctrlQ.empty():
ctrlmd = ctrlQ.get()
if ctrlmd is "EOT" :
istream.stop_stream()
istream.close()
print("Closed record thread")
return;
try: # when the pyaudio object is distroyed stops
data_str = istream.read(chunk) # read a chunk of data
except:
break
data_flt = np.fromstring( data_str, 'float32' ) # convert string to float
queue.put( data_flt ) # append to list
def printDevNumbers(p):
N = p.get_device_count()
for n in range(0,N):
name = p.get_device_info_by_index(n).get('name')
print n, name
p = pyaudio.PyAudio()
printDevNumbers(p)
p.terminate()
# CHANGE!!!!
dusb_in = 3
dusb_out = 3
din = 0
dout = 1
Initialize serial port
if sys.platform == 'darwin': # Mac
s = serial.Serial(port='/dev/tty.SLAB_USBtoUART')
else: #windows
s = serial.Serial(port='COM1') # CHANGE !!!!!!
The architecture we chose for this lab is to create a class that implements the functions of the modem, while keeping joint state variables such that it would be possible to process buffers in real-time while maintaining continuity and integrity of the processing as if we are processing a large buffer.
We created the infrastructure for you, implemented in the TNCaprs
class.
It implelents an overlapp and save approach. The constructor for the class takes in the sampling frequency: fs
the size of pyAudio buffer: Abuffer
and the number of audio buffers to collect before processing:Nchunks
processBuffer(self, buff_in)
¶The method processBuffer(buff_in)
takes buffers in to be processed, collects them into a large buffer made of Nchunks
. Once the large buffer has been collected, the method calls the demodulation method to create an NRZI, it calls the PLL to find the sampling timings, it samples the NRZI and converts it to NRZ bits. Finally it calls the function that looks for packets in the bitstream.
The method implements an overlapp and save approach. So, it calls the demodulation function with a buffer that is overlaping and expects a smaller buffer, that contains only valid linear convolution samples.
demod(self, buff)
-- same as nc_afsk1200Demod
¶Because there are 3 filters one after the other in the demodulator, each one of length N, the method processBuffer(buff_in)
sends to demod
a buffer sized Nchunks*Abuffer + 3*(N-1)
and expects Nchunks*Abuffer
samples in return. If you have different size filters, you need to modify processBuffer(buff_in)
to account for that. The filters for demod
are generated in the __init__
function of the TNCaprs
class.
PLL(self, NRZa)
¶This is the same PLL implementation as before. The only different is that the PLL counter: pll
, its previous value: ppll
, the stepsize: dpll
and the agressivness or scaling factor apll
are all class variables. This way, when the PLL finishes processing a buffer, the valuse are saved and used in the beginning of the next buffer
findPackets(self,bits)
¶This function is the same as before. The only differences are that the variables, state
, pktcounter
, and packet
are class variable and their value is kept from one call to another.
Another difference is that we also added another variable: bitpointer
. As you recall, the function looks for flags in the bitstream up to 7 bits before the end of the buffer. bitpointer
is needed for the case where a flag is detected at the end of the buffer and extend to these 7 bits. That means that when we process the next buffer, we need to start after those bits, and bitpointer
points to the bit we need to start with.
modulate(self, bits)
-- same as afsk1200
¶Function takes bits and afsk 1200 modulates them. The sampling rate is initialized in TNCaprs.__init__
modulatPacket(self, callsign, digi, dest, info, preflags=80, postflags=80 )
¶Given callsign, digipath, dest, info, number of pre-flags and post-flags the function contructs an appropriate aprs packet, then converts them to NRZI and calls modulate
to afsk 1200 modulate the packet.
nc_afsk1200Demod
and afsk1200
to TNCaprs.demod
and TNCaprs.modulate
. Make it into class form in which class vriables are in the form of self.variable
which are initialized in TNCaprs.__init__
. class TNCaprs:
def __init__(self, fs = 48000.0, Abuffer = 1024, Nchunks=43):
# Implementation of an afsk1200 TNC.
#
# The TNC processes a `Abuffer` long buffers, till `Nchunks` number of buffers are collected into a large one.
# This is because python is able to more efficiently process larger buffers than smaller ones.
# Then, the resulting large buffer is demodulated, sampled and packets extracted.
#
# Inputs:
# fs - sampling rate
# TBW - TBW of the demodulator filters
# Abuffer - Input audio buffers from Pyaudio
# Nchunks - Number of audio buffers to collect before processing
# plla - agressivness parameter of the PLL
## compute sizes based on inputs
self.TBW = 2.0 # TBW for the demod filters
self.N = (int(fs/1200*self.TBW)//2)*2+1 # length of the filters for demod
self.fs = fs # sampling rate
self.BW = self.TBW/(1.0*self.N/fs) # BW of filter based on TBW
self.Abuffer = Abuffer # size of audio buffer
self.Nchunks = Nchunks # number of audio buffers to collect
self.Nbuffer = Abuffer*Nchunks+self.N*3-3 # length of the large buffer for processing
self.Ns = 1.0*fs/1200 # samples per symbol
## state variables for the modulator
self.prev_ph = 0 # previous phase to maintain continuous phase when recalling the function
## Generate Filters for the demodulator
self.h_lp =
self.h_space =
self.h_mark =
self.h_lpp =
self.h_bp =
## PLL state variables -- so conntinuity between buffers is preserved
self.dpll = np.round(2.0**32 / self.Ns).astype(int32) # PLL step
self.pll = 0 # PLL counter
self.ppll = -self.dpll # PLL counter previous value -- to detect overflow
self.plla = 0.74 # PLL agressivness (small more agressive)
## state variable to NRZI2NRZ
self.NRZIprevBit = True
## State variables for findPackets
self.state='search' # state variable: 'search' or 'pkt'
self.pktcounter = 0 # counts the length of a packet
self.packet = bitarray.bitarray([0,1,1,1,1,1,1,0]) # current packet being collected
self.bitpointer = 0 # poiter to advance the search beyond what was already searched in the previous buffer
## State variables for processBuffer
self.buff = zeros(self.Nbuffer) # large overlapp-save buffer
self.chunk_count = 0 # chunk counter
self.oldbits = bitarray.bitarray([0,0,0,0,0,0,0]) # bits from end of prev buffer to be copied to beginning of new
self.Npackets = 0 # packet counter
def NRZ2NRZI(self,NRZ, prevBit = True):
NRZI = NRZ.copy()
for n in range(0,len(NRZ)):
if NRZ[n] :
NRZI[n] = prevBit
else:
NRZI[n] = not(prevBit)
prevBit = NRZI[n]
return NRZI
def NRZI2NRZ(self, NRZI):
NRZ = NRZI.copy()
for n in range(0,len(NRZI)):
NRZ[n] = NRZI[n] == self.NRZIprevBit
self.NRZIprevBit = NRZI[n]
return NRZ
def modulate(self,bits):
# the function will take a bitarray of bits and will output an AFSK1200 modulated signal of them, sampled at 44100Hz
# Inputs:
# bits - bitarray of bits
# fs - sampling rate
# Outputs:
# sig - returns afsk1200 modulated signal
# for you to complete
return sig
def modulatPacket(self, callsign, digi, dest, info, preflags=80, postflags=80 ):
# given callsign, digipath, dest, info, number of pre-flags and post-flags the function contructs
# an appropriate aprs packet, then converts them to NRZI and calls `modulate` to afsk 1200 modulate the packet.
packet = ax25.UI(destination=dest,source=callsign, info=info, digipeaters=digi.split(b','),)
prefix = bitarray.bitarray(np.tile([0,1,1,1,1,1,1,0],(preflags,)).tolist())
suffix = bitarray.bitarray(np.tile([0,1,1,1,1,1,1,0],(postflags,)).tolist())
sig = self.modulate(self.NRZ2NRZI(prefix + packet.unparse()+suffix))
return sig
def demod(self, buff):
# Similar to afsk1200_demod, for you to complete
return NRZ
def PLL(self, NRZa):
idx = zeros(len(NRZa)//int(self.Ns)*2) # allocate space to save indexes
c = 0
for n in range(1,len(NRZa)):
if (self.pll < 0) and (self.ppll >0):
idx[c] = n
c = c+1
if (NRZa[n] >= 0) != (NRZa[n-1] >=0):
self.pll = int32(self.pll*self.plla)
self.ppll = self.pll
self.pll = int32(self.pll+ self.dpll)
return idx[:c].astype(int32)
def findPackets(self,bits):
# function take a bitarray and looks for AX.25 packets in it.
# It implements a 2-state machine of searching for flag or collecting packets
flg = bitarray.bitarray([0,1,1,1,1,1,1,0])
packets = []
n = self.bitpointer
# Loop over bits
while (n < len(bits)-7) :
# default state is searching for packets
if self.state is 'search':
# look for 1111110, because can't be sure if the first zero is decoded
# well if the packet is not padded.
if bits[n:n+7] == flg[1:]:
# flag detected, so switch state to collecting bits in a packet
# start by copying the flag to the packet
# start counter to count the number of bits in the packet
self.state = 'pkt'
self.packet=flg.copy()
self.pktcounter = 8
# Advance to the end of the flag
n = n + 7
else:
# flag was not found, advance by 1
n = n + 1
# state is to collect packet data.
elif self.state is 'pkt':
# Check if we reached a flag by comparing with 0111111
# 6 times ones is not allowed in a packet, hence it must be a flag (if there's no error)
if bits[n:n+7] == flg[:7]:
# Flag detected, check if packet is longer than some minimum
if self.pktcounter > 200:
# End of packet reached! append packet to list and switch to searching state
# We don't advance pointer since this our packet might have been
# flase detection and this flag could be the beginning of a real packet
self.state = 'search'
self.packet.extend(flg)
packets.append(self.packet.copy())
else:
# packet is too short! false alarm. Keep searching
# We don't advance pointer since this this flag could be the beginning of a real packet
self.state = 'search'
# No flag, so collect the bit and add to the packet
else:
# check if packet is too long... if so, must be false alarm
if self.pktcounter < 2680:
# Not a false alarm, collect the bit and advance pointer
self.packet.append(bits[n])
self.pktcounter = self.pktcounter + 1
n = n + 1
else: #runaway packet
#runaway packet, switch state to searching, and advance pointer
self.state = 'search'
n = n + 1
self.bitpointer = n-(len(bits)-7)
return packets
# function to generate a checksum for validating packets
def genfcs(self,bits):
# Generates a checksum from packet bits
fcs = ax25.FCS()
for bit in bits:
fcs.update_bit(bit)
digest = bitarray.bitarray(endian="little")
digest.frombytes(fcs.digest())
return digest
# function to parse packet bits to information
def decodeAX25(self,bits, deepsearch=False):
ax = ax25.AX25()
ax.info = "bad packet"
bitsu = ax25.bit_unstuff(bits[8:-8])
foundPacket = False
if (self.genfcs(bitsu[:-16]).tobytes() == bitsu[-16:].tobytes()):
foundPacket = True
elif deepsearch:
tbits = bits[8:-8]
for n in range(0,len(tbits)):
tbits[n] = not tbits[n]
if (self.genfcs(bitsu[:-16]).tobytes() == bitsu[-16:].tobytes()):
foundPacket = True
print("Success deep search")
break
tbits[n] = not tbits[n]
if foundPacket == False:
return ax
bytes = bitsu.tobytes()
ax.destination = ax.callsign_decode(bitsu[:56])
source = ax.callsign_decode(bitsu[56:112])
if source[-1].isdigit() and source[-1]!="0":
ax.source = b"".join((source[:-1],'-',source[-1]))
else:
ax.source = source[:-1]
digilen=0
if bytes[14]=='\x03' and bytes[15]=='\xf0':
digilen = 0
else:
for n in range(14,len(bytes)-1):
if ord(bytes[n]) & 1:
digilen = (n-14)+1
break
# if digilen > 56:
# return ax
ax.digipeaters = ax.callsign_decode(bitsu[112:112+digilen*8])
ax.info = bitsu[112+digilen*8+16:-16].tobytes()
return ax
def processBuffer(self, buff_in):
# function processes an audio buffer. It collect several small into a large one
# Then it demodulates and finds packets.
#
# The function operates as overlapp and save
# The function returns packets when they become available. Otherwise, returns empty list
N = self.N
NN = N*3-3
Nchunks = self.Nchunks
Abuffer = self.Abuffer
fs = self.fs
Ns = self.Ns
validPackets=[]
packets=[]
NRZI=[]
idx = []
bits = []
# Fill in buffer at the right plave
self.buff[NN+self.chunk_count*Abuffer:NN+(self.chunk_count+1)*Abuffer] = buff_in.copy()
self.chunk_count = self.chunk_count + 1
# number of chunk reached -- process large buffer
if self.chunk_count == Nchunks:
# Demodulate to get NRZI
NRZI = self.demod(self.buff)
# compute sampling points, using PLL
idx = self.PLL(NRZI)
# Sample and make a decision based on threshold
bits = bitarray.bitarray((NRZI[idx]>0).tolist())
# In case that buffer is too small raise an error -- must have at least 7 bits worth
if len(bits) < 7:
raise ValueError('number of bits too small for buffer')
# concatenate end of previous buffer to current one
bits = self.oldbits + self.NRZI2NRZ(bits)
# store end of bit buffer to next buffer
self.oldbits = bits[-7:].copy()
# look for packets
packets = self.findPackets(bits)
# Copy end of sample buffer to the beginning of the next (overlapp and save)
self.buff[:NN] = self.buff[-NN:].copy()
# reset chunk counter
self.chunk_count = 0
# checksum test for all detected packets
for n in range(0,len(packets)):
if len(packets[n]) > 200:
ax = self.decodeAX25(packets[n])
if ax.info != 'bad packet':
validPackets.append(ax)
return validPackets
Now, we are ready to test our modem. Let's first load the ISS recording and see if out modem can detect the 24 packets we detected earlier. The difference is that we will load data in small buffers of 1024 samples and process them over a larger buffer made of 20-40 small buffers (corresponding to ~ 0.5 to 1 seconds)
modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 20)
packets = modem.processBuffer(sig[n:n+1024])
modem.processBuffer
will return a non-empy object whenever it detects packets. Iterate on the returned objects and display the packets.modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 1)
. You should still get 24 packetsimport urllib, ssl
testfile = urllib.URLopener()
testfile.context = ssl._create_unverified_context()
testfile.retrieve("https://inst.eecs.berkeley.edu/~ee123/sp16/lab/lab5/ISS.wav", 'ISS.wav')
fs, sig = wavread("ISS.wav")
modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 1)
npack = 0
for n in range(0,len(sig),1024):
packets = modem.processBuffer(sig[n:n+1024])
for ax in packets:
npack = npack + 1
print(str(npack)+")",str(ax) )
We have reated a gui application for you.
Write code that does the following: reads a file, breaks it into packets, modulates them and plays them on USB audio connected in loopback mode. At the same time, records the played packets, demodulates them and stores the info data into a new file. Basically, a full file transfer in loopback mode.
calBlue.tiff
file as a binary file: f = open("calBlue.tiff,"rb")
f_in = open("rec_calBlue.tiff,"wb")
callsign = "KK6MRI"
fname = "calBlue.tiff"
f = open(fname,"rb")
fs = 11025
modem = TNCaprs(fs = fs ,Abuffer = 1024,Nchunks = 12)
Qin = Queue.Queue()
Qout = Queue.Queue()
# create a control fifo to kill threads when done
cQin = Queue.Queue()
cQout = Queue.Queue()
# create a pyaudio object
p = pyaudio.PyAudio()
# initialize a recording thread.
t_rec = threading.Thread(target = record_audio, args = (Qin, cQin,p, fs, dusb_in))
t_play = threading.Thread(target = play_audio, args = (Qout, cQout,p, fs, dusb_out))
print("Putting packets in Queue")
npp = 0
tmp = modem.modulatPacket(callsign, "", "BEGIN", fname , preflags=80, postflags=2 )
Qout.put(tmp)
while(1):
bytes = f.read(256)
tmp = modem.modulatPacket(callsign, "", str(npp), bytes, preflags=4, postflags=2 )
Qout.put(tmp)
npp = npp+1
if len(bytes) < 256:
break
tmp = modem.modulatPacket(callsign, "", "END", "This is the end of transmission", preflags=2, postflags=80 )
Qout.put(tmp)
Qout.put("EOT")
print("Done generating packets")
# start the recording and playing threads
t_rec.start()
time.sleep(2)
t_play.start()
starttime = time.time()
npack = 0
state = 0
while(1):
tmp = Qin.get()
Qout.put(tmp)
packets = modem.processBuffer(tmp)
for ax in packets:
npack = npack + 1
print((str(npack)+")",str(ax)))
if state == 0 and ax.destination[:5]=="BEGIN":
f1 = open("rec_"+ax.info,"wb")
state = 1
elif state == 1 and ax.destination[:3] == "END":
state = 2
break
elif state == 1:
f1.write(ax.info)
print("write")
if state == 2 :
break
print(time.time() - starttime)
cQout.put("EOT")
cQin.put("EOT")
f1.close()
f.close()