Lab 5 Part IV: Stream processing and full APRS tranciever

In this part of the lab we will take the functions that we wrote in the previous part and put them in a streaming architecture such that we can collect finite buffers and proces them in real-time.

You will need the file ax25.py

In [ ]:
%pylab
# Import functions and libraries
import numpy as np
import matplotlib.pyplot as plt
import pyaudio
import Queue
import threading,time
import sys

from numpy import pi
from numpy import sin
from numpy import zeros
from numpy import r_
from numpy import ones
from scipy import signal
from scipy import integrate

import threading,time
import multiprocessing

from numpy import mean
from numpy import power
from numpy.fft import fft
from numpy.fft import fftshift
from numpy.fft import ifft
from numpy.fft import ifftshift
import bitarray
from  scipy.io.wavfile import read as wavread
import serial
import ax25
from fractions import gcd



%matplotlib inline
In [ ]:
# function to compute least common multipler
def lcm(numbers):
    return reduce(lambda x, y: (x*y)/gcd(x,y), numbers, 1)


def play_audio( Q,ctrlQ ,p, fs , dev, ser="", keydelay=0.1):
    # play_audio plays audio with sampling rate = fs
    # Q - A queue object from which to play
    # ctrlQ - A queue object for ending the thread
    # p   - pyAudio object
    # fs  - sampling rate
    # dev - device number
    # ser - pyserial device to key the radio
    # keydelay - delay after keying the radio
    #
    #
    # There are two ways to end the thread: 
    #    1 - send "EOT" through  the control queue. This is used to terminate the thread on demand
    #    2 - send "EOT" through the data queue. This is used to terminate the thread when data is done. 
    #
    # You can also key the radio either through the data queu and the control queue
    
    
    # open output stream
    ostream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),output=True,output_device_index=dev)
    # play audio
    while (1):
        if not ctrlQ.empty():
            
            # control queue 
            ctrlmd = ctrlQ.get()
            if ctrlmd is "EOT"  :
                    ostream.stop_stream()
                    ostream.close()
                    print("Closed  play thread")
                    return;
            elif (ctrlmd is "KEYOFF"  and ser!=""):
                ser.setDTR(0)
                #print("keyoff\n")
            elif (ctrlmd is "KEYON" and ser!=""):
                ser.setDTR(1)  # key PTT
                #print("keyon\n")
                time.sleep(keydelay) # wait 200ms (default) to let the power amp to ramp up
                
        
        data = Q.get()
        
        if (data is "EOT") :
            ostream.stop_stream()
            ostream.close()
            print("Closed  play thread")
            return;
        elif (data is "KEYOFF"  and ser!=""):
            ser.setDTR(0)
            #print("keyoff\n")
        elif (data is "KEYON" and ser!=""):
            ser.setDTR(1)  # key PTT
            #print("keyon\n")
            time.sleep(keydelay) # wait 200ms (default) to let the power amp to ramp up
            
        else:
            try:
                ostream.write( data.astype(np.float32).tostring() )
            except:
                print("Exception")
                break
            
def record_audio( queue,ctrlQ, p, fs ,dev,chunk=1024):
    # record_audio records audio with sampling rate = fs
    # queue - output data queue
    # p     - pyAudio object
    # fs    - sampling rate
    # dev   - device number 
    # chunk - chunks of samples at a time default 1024
    #
    # Example:
    # fs = 44100
    # Q = Queue.queue()
    # p = pyaudio.PyAudio() #instantiate PyAudio
    # record_audio( Q, p, fs, 1) # 
    # p.terminate() # terminate pyAudio
    
   
    istream = p.open(format=pyaudio.paFloat32, channels=1, rate=int(fs),input=True,input_device_index=dev,frames_per_buffer=chunk)

    # record audio in chunks and append to frames
    frames = [];
    while (1):
        if not ctrlQ.empty():
            ctrlmd = ctrlQ.get()          
            if ctrlmd is "EOT"  :
                istream.stop_stream()
                istream.close()
                print("Closed  record thread")
                return;
        try:  # when the pyaudio object is distroyed stops
            data_str = istream.read(chunk) # read a chunk of data
        except:
            break
        data_flt = np.fromstring( data_str, 'float32' ) # convert string to float
        queue.put( data_flt ) # append to list

    
            

def printDevNumbers(p):
    N = p.get_device_count()
    for n in range(0,N):
        name = p.get_device_info_by_index(n).get('name')
        print n, name
                
In [ ]:
        
p = pyaudio.PyAudio()
printDevNumbers(p)
p.terminate()
In [ ]:
# CHANGE!!!!
dusb_in = 3
dusb_out = 3
din = 0
dout = 1

Initialize serial port

In [ ]:
if sys.platform == 'darwin':  # Mac
    s = serial.Serial(port='/dev/tty.SLAB_USBtoUART')
else:                         #windows
    s = serial.Serial(port='COM1') # CHANGE !!!!!!

Creating a TNC (Termina Node Controller, e.g., modem) class

The architecture we chose for this lab is to create a class that implements the functions of the modem, while keeping joint state variables such that it would be possible to process buffers in real-time while maintaining continuity and integrity of the processing as if we are processing a large buffer.

We created the infrastructure for you, implemented in the TNCaprs class. It implelents an overlapp and save approach. The constructor for the class takes in the sampling frequency: fs the size of pyAudio buffer: Abuffer and the number of audio buffers to collect before processing:Nchunks

processBuffer(self, buff_in)

The method processBuffer(buff_in) takes buffers in to be processed, collects them into a large buffer made of Nchunks. Once the large buffer has been collected, the method calls the demodulation method to create an NRZI, it calls the PLL to find the sampling timings, it samples the NRZI and converts it to NRZ bits. Finally it calls the function that looks for packets in the bitstream. The method implements an overlapp and save approach. So, it calls the demodulation function with a buffer that is overlaping and expects a smaller buffer, that contains only valid linear convolution samples.

demod(self, buff) -- same as nc_afsk1200Demod

Because there are 3 filters one after the other in the demodulator, each one of length N, the method processBuffer(buff_in) sends to demod a buffer sized Nchunks*Abuffer + 3*(N-1) and expects Nchunks*Abuffer samples in return. If you have different size filters, you need to modify processBuffer(buff_in) to account for that. The filters for demod are generated in the __init__ function of the TNCaprs class.

PLL(self, NRZa)

This is the same PLL implementation as before. The only different is that the PLL counter: pll, its previous value: ppll, the stepsize: dpll and the agressivness or scaling factor apll are all class variables. This way, when the PLL finishes processing a buffer, the valuse are saved and used in the beginning of the next buffer

findPackets(self,bits)

This function is the same as before. The only differences are that the variables, state, pktcounter, and packet are class variable and their value is kept from one call to another.

Another difference is that we also added another variable: bitpointer. As you recall, the function looks for flags in the bitstream up to 7 bits before the end of the buffer. bitpointer is needed for the case where a flag is detected at the end of the buffer and extend to these 7 bits. That means that when we process the next buffer, we need to start after those bits, and bitpointer points to the bit we need to start with.

modulate(self, bits) -- same as afsk1200

Function takes bits and afsk 1200 modulates them. The sampling rate is initialized in TNCaprs.__init__

modulatPacket(self, callsign, digi, dest, info, preflags=80, postflags=80 )

Given callsign, digipath, dest, info, number of pre-flags and post-flags the function contructs an appropriate aprs packet, then converts them to NRZI and calls modulate to afsk 1200 modulate the packet.

Task:

  • Convers your functions nc_afsk1200Demod and afsk1200 to TNCaprs.demod and TNCaprs.modulate. Make it into class form in which class vriables are in the form of self.variable which are initialized in TNCaprs.__init__.
In [ ]:
class TNCaprs:
    
    def __init__(self, fs = 48000.0, Abuffer = 1024, Nchunks=43):
        
        #  Implementation of an afsk1200 TNC. 
        #
        #  The TNC processes a `Abuffer` long buffers, till `Nchunks` number of buffers are collected into a large one.
        #  This is because python is able to more efficiently process larger buffers than smaller ones.
        #  Then, the resulting large buffer is demodulated, sampled and packets extracted.
        #
        # Inputs:
        #    fs  - sampling rate
        #   TBW  -  TBW of the demodulator filters
        #   Abuffer - Input audio buffers from Pyaudio
        #   Nchunks - Number of audio buffers to collect before processing
        #   plla    - agressivness parameter of the PLL
        
        
        ## compute sizes based on inputs
        self.TBW = 2.0   # TBW for the demod filters
        self.N = (int(fs/1200*self.TBW)//2)*2+1   # length of the filters for demod
        self.fs = fs     # sampling rate   
        self.BW = self.TBW/(1.0*self.N/fs)      # BW of filter based on TBW
        self.Abuffer = Abuffer             # size of audio buffer
        self.Nchunks = Nchunks             # number of audio buffers to collect
        self.Nbuffer = Abuffer*Nchunks+self.N*3-3         # length of the large buffer for processing
        self.Ns = 1.0*fs/1200 # samples per symbol
        
        ## state variables for the modulator
        self.prev_ph = 0  # previous phase to maintain continuous phase when recalling the function
        
        ##  Generate Filters for the demodulator
        self.h_lp = 
        self.h_space = 
        self.h_mark = 
        
        self.h_lpp = 
        self.h_bp = 


        ## PLL state variables  -- so conntinuity between buffers is preserved
        self.dpll = np.round(2.0**32 / self.Ns).astype(int32)    # PLL step
        self.pll =  0                # PLL counter
        self.ppll = -self.dpll       # PLL counter previous value -- to detect overflow
        self.plla = 0.74             # PLL agressivness (small more agressive)
        

        ## state variable to NRZI2NRZ
        self.NRZIprevBit = True  
        
        ## State variables for findPackets
        self.state='search'   # state variable:  'search' or 'pkt'
        self.pktcounter = 0   # counts the length of a packet
        self.packet = bitarray.bitarray([0,1,1,1,1,1,1,0])   # current packet being collected
        self.bitpointer = 0   # poiter to advance the search beyond what was already searched in the previous buffer

        ## State variables for processBuffer
        self.buff = zeros(self.Nbuffer)   # large overlapp-save buffer
        self.chunk_count = 0              # chunk counter
        self.oldbits = bitarray.bitarray([0,0,0,0,0,0,0])    # bits from end of prev buffer to be copied to beginning of new
        self.Npackets = 0                 # packet counter
        
        
    
    
    def NRZ2NRZI(self,NRZ, prevBit = True):
        NRZI = NRZ.copy() 
        for n in range(0,len(NRZ)):
            if NRZ[n] :
                NRZI[n] = prevBit
            else:
                NRZI[n] = not(prevBit)
            prevBit = NRZI[n]
        return NRZI

    def NRZI2NRZ(self, NRZI):  
        NRZ = NRZI.copy() 
    
        for n in range(0,len(NRZI)):
            NRZ[n] = NRZI[n] == self.NRZIprevBit
            self.NRZIprevBit = NRZI[n]
    
        return NRZ
    
    def modulate(self,bits):
    # the function will take a bitarray of bits and will output an AFSK1200 modulated signal of them, sampled at 44100Hz
    #  Inputs:
    #         bits  - bitarray of bits
    #         fs    - sampling rate
    # Outputs:
    #         sig    -  returns afsk1200 modulated signal
    

    
    # for you to complete
        
        return sig 
    
    def modulatPacket(self, callsign, digi, dest, info, preflags=80, postflags=80 ):
        
        # given callsign, digipath, dest, info, number of pre-flags and post-flags the function contructs
        # an appropriate aprs packet, then converts them to NRZI and calls `modulate` to afsk 1200 modulate the packet. 
        
        packet = ax25.UI(destination=dest,source=callsign, info=info, digipeaters=digi.split(b','),)
        prefix = bitarray.bitarray(np.tile([0,1,1,1,1,1,1,0],(preflags,)).tolist())
        suffix = bitarray.bitarray(np.tile([0,1,1,1,1,1,1,0],(postflags,)).tolist())
        sig = self.modulate(self.NRZ2NRZI(prefix + packet.unparse()+suffix))

        return sig
    
    

    def demod(self, buff):
    # Similar to afsk1200_demod,  for you to complete
    
        
        return NRZ


    
    def PLL(self, NRZa):        
        idx = zeros(len(NRZa)//int(self.Ns)*2)   # allocate space to save indexes        
        c = 0
        
        for n in range(1,len(NRZa)):
            if (self.pll < 0) and (self.ppll >0):
                idx[c] = n
                c = c+1
        
            if (NRZa[n] >= 0) !=  (NRZa[n-1] >=0):
                self.pll = int32(self.pll*self.plla)
            
        
            self.ppll = self.pll
            self.pll = int32(self.pll+ self.dpll)
    
        return idx[:c].astype(int32) 
    
   

    def findPackets(self,bits):
        # function take a bitarray and looks for AX.25 packets in it. 
        # It implements a 2-state machine of searching for flag or collecting packets
        flg = bitarray.bitarray([0,1,1,1,1,1,1,0])
        packets = []
        n = self.bitpointer
        
        # Loop over bits
        while (n < len(bits)-7) :
            # default state is searching for packets
            if self.state is 'search':
                # look for 1111110, because can't be sure if the first zero is decoded
                # well if the packet is not padded.
                if bits[n:n+7] == flg[1:]:
                    # flag detected, so switch state to collecting bits in a packet
                    # start by copying the flag to the packet
                    # start counter to count the number of bits in the packet
                    self.state = 'pkt'
                    self.packet=flg.copy()
                    self.pktcounter = 8
                    # Advance to the end of the flag
                    n = n + 7
                else:
                    # flag was not found, advance by 1
                    n = n + 1            
        
            # state is to collect packet data. 
            elif self.state is 'pkt':
                # Check if we reached a flag by comparing with 0111111
                # 6 times ones is not allowed in a packet, hence it must be a flag (if there's no error)
                if bits[n:n+7] == flg[:7]:
                    # Flag detected, check if packet is longer than some minimum
                    if self.pktcounter > 200:
                        # End of packet reached! append packet to list and switch to searching state
                        # We don't advance pointer since this our packet might have been
                        # flase detection and this flag could be the beginning of a real packet
                        self.state = 'search'
                        self.packet.extend(flg)
                        packets.append(self.packet.copy())
                    else:
                        # packet is too short! false alarm. Keep searching 
                        # We don't advance pointer since this this flag could be the beginning of a real packet
                        self.state = 'search'
                # No flag, so collect the bit and add to the packet
                else:
                    # check if packet is too long... if so, must be false alarm
                    if self.pktcounter < 2680:
                        # Not a false alarm, collect the bit and advance pointer        
                        self.packet.append(bits[n])
                        self.pktcounter = self.pktcounter + 1
                        n = n + 1
                    else:  #runaway packet
                        #runaway packet, switch state to searching, and advance pointer
                        self.state = 'search'
                        n = n + 1
        
        self.bitpointer = n-(len(bits)-7) 
        return packets

    
    # function to generate a checksum for validating packets
    def genfcs(self,bits):
        # Generates a checksum from packet bits
        fcs = ax25.FCS()
        for bit in bits:
            fcs.update_bit(bit)
    
        digest = bitarray.bitarray(endian="little")
        digest.frombytes(fcs.digest())

        return digest




    # function to parse packet bits to information
    def decodeAX25(self,bits, deepsearch=False):
        ax = ax25.AX25()
        ax.info = "bad packet"
    
    
        bitsu = ax25.bit_unstuff(bits[8:-8])
    
        
        foundPacket = False
        if (self.genfcs(bitsu[:-16]).tobytes() == bitsu[-16:].tobytes()):
                foundPacket = True
        elif deepsearch: 
            tbits = bits[8:-8]
            for n in range(0,len(tbits)):
                tbits[n] = not tbits[n]
                if (self.genfcs(bitsu[:-16]).tobytes() == bitsu[-16:].tobytes()):
                    foundPacket = True
                    print("Success deep search")
                    break
                tbits[n] = not tbits[n]
        
        if foundPacket == False:
            return ax
        
        
                  
    
        bytes = bitsu.tobytes()
        ax.destination = ax.callsign_decode(bitsu[:56])
        source = ax.callsign_decode(bitsu[56:112])
        if source[-1].isdigit() and source[-1]!="0":
            ax.source = b"".join((source[:-1],'-',source[-1]))
        else:
            ax.source = source[:-1]
    
        digilen=0    
    
        if bytes[14]=='\x03' and bytes[15]=='\xf0':
            digilen = 0
        else:
            for n in range(14,len(bytes)-1):
                if ord(bytes[n]) & 1:
                    digilen = (n-14)+1
                    break

        #    if digilen > 56:
        #        return ax
        ax.digipeaters =  ax.callsign_decode(bitsu[112:112+digilen*8])
        ax.info = bitsu[112+digilen*8+16:-16].tobytes()
    
        return ax

    def processBuffer(self, buff_in):
        
        # function processes an audio buffer. It collect several small into a large one
        # Then it demodulates and finds packets.
        #
        # The function operates as overlapp and save
        # The function returns packets when they become available. Otherwise, returns empty list
        
        N = self.N
        NN = N*3-3
        
        
        Nchunks = self.Nchunks
        Abuffer = self.Abuffer
        fs = self.fs
        Ns = self.Ns
        
        validPackets=[]
        packets=[]
        NRZI=[]
        idx = []
        bits = []
        
        # Fill in buffer at the right plave
        self.buff[NN+self.chunk_count*Abuffer:NN+(self.chunk_count+1)*Abuffer] = buff_in.copy()
        self.chunk_count = self.chunk_count + 1
        
        
        # number of chunk reached -- process large buffer
        if self.chunk_count == Nchunks:
            # Demodulate to get NRZI
            NRZI = self.demod(self.buff)
            # compute sampling points, using PLL
            idx = self.PLL(NRZI)
            # Sample and make a decision based on threshold
            bits = bitarray.bitarray((NRZI[idx]>0).tolist())
            # In case that buffer is too small raise an error -- must have at least 7 bits worth
            if len(bits) < 7:
                raise ValueError('number of bits too small for buffer')
            
            # concatenate end of previous buffer to current one
            bits = self.oldbits + self.NRZI2NRZ(bits)
            
            # store end of bit buffer to next buffer
            self.oldbits = bits[-7:].copy()
            
            # look for packets
            packets = self.findPackets(bits)
            
            # Copy end of sample buffer to the beginning of the next (overlapp and save)
            self.buff[:NN] = self.buff[-NN:].copy()
            
            # reset chunk counter
            self.chunk_count = 0
            
            # checksum test for all detected packets
            for n in range(0,len(packets)):
                if len(packets[n]) > 200: 
                    ax = self.decodeAX25(packets[n])
                    if ax.info != 'bad packet':
                        validPackets.append(ax)
                        
            
        return validPackets

Testing the modem reception

Now, we are ready to test our modem. Let's first load the ISS recording and see if out modem can detect the 24 packets we detected earlier. The difference is that we will load data in small buffers of 1024 samples and process them over a larger buffer made of 20-40 small buffers (corresponding to ~ 0.5 to 1 seconds)

Task:

  • Load ISS.wav
  • Create a TNC object using modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 20)
  • Process the samples with the modem, 1024 samples at a time using packets = modem.processBuffer(sig[n:n+1024])
  • The method modem.processBuffer will return a non-empy object whenever it detects packets. Iterate on the returned objects and display the packets.
  • Repeat for modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 1). You should still get 24 packets
In [ ]:
import urllib, ssl
testfile = urllib.URLopener()
testfile.context = ssl._create_unverified_context()
testfile.retrieve("https://inst.eecs.berkeley.edu/~ee123/sp16/lab/lab5/ISS.wav", 'ISS.wav')
In [ ]:
fs, sig = wavread("ISS.wav")
In [ ]:
modem = TNCaprs(fs = fs,Abuffer = 1024,Nchunks = 1)

npack = 0
for n in range(0,len(sig),1024):
    packets  = modem.processBuffer(sig[n:n+1024])
    for ax in packets: 
        npack = npack + 1
        print(str(npack)+")",str(ax) )
    

APRS Gui Application

We have reated a gui application for you.

  • Download aprs_app.py, ax25.py and aprs.py from the class website.
  • Copy and paste your TNCaprs class code into aprs.py
  • Run the app from the commandline (not from ipython notebooks)
  • Enter the appropriate audio devices and enter your callsign.
  • To debug that the application works, you don't need the radio. You can operate it in loopback mode
  • When using the radio, for best results, turn the squelch to 0 (Menue->0 then Menue again and set to 0).
  • Once you get the app to work, you can play with sending EMAIL, SMS (Link to howto) and text messages to other classmates. Enjoy!
  • You can either operate on the APRS frequency, decode packets and send in real time, try to communicate through the ISS, or use one of the digital channels and text your friends!

Sending files in loopback -- practice for the project

Task:

Write code that does the following: reads a file, breaks it into packets, modulates them and plays them on USB audio connected in loopback mode. At the same time, records the played packets, demodulates them and stores the info data into a new file. Basically, a full file transfer in loopback mode.

  • Open the calBlue.tiff file as a binary file: f = open("calBlue.tiff,"rb")
  • Create a modem using the TNCaprs with sampling rate of 11025Hz. Abuffer = 1024. Nchunks = 12
  • Read 256 bytes at a time, create APRS packets with the 256 bytes in the info field.
  • Modulate the packets and push into a Queue
  • Connect the interface in loopback mode
  • Create a play thread that will play the packets to the USB audio and a recording thread that will record from loopback USB audio.
  • Open a new file for writing f_in = open("rec_calBlue.tiff,"wb")
  • Write each 256 bytes you decode into the new file
In [ ]:
callsign = "KK6MRI"
fname = "calBlue.tiff"
f = open(fname,"rb")

fs = 11025
modem = TNCaprs(fs = fs ,Abuffer = 1024,Nchunks = 12)




Qin = Queue.Queue()
Qout = Queue.Queue()

# create a control fifo to kill threads when done
cQin = Queue.Queue()
cQout = Queue.Queue()

# create a pyaudio object
p = pyaudio.PyAudio()

# initialize a recording thread. 
t_rec = threading.Thread(target = record_audio,   args = (Qin, cQin,p, fs, dusb_in))
t_play = threading.Thread(target = play_audio,   args = (Qout, cQout,p, fs, dusb_out))



print("Putting packets in Queue")
npp = 0
tmp = modem.modulatPacket(callsign, "", "BEGIN", fname , preflags=80, postflags=2 ) 
Qout.put(tmp)
while(1):
    bytes = f.read(256)    
    tmp = modem.modulatPacket(callsign, "", str(npp), bytes, preflags=4, postflags=2 )    
    Qout.put(tmp)
    npp = npp+1
    if len(bytes) < 256:
        break
tmp = modem.modulatPacket(callsign, "", "END", "This is the end of transmission", preflags=2, postflags=80 )
Qout.put(tmp)
Qout.put("EOT")

print("Done generating packets")


# start the recording and playing threads
t_rec.start()
time.sleep(2)
t_play.start()

starttime = time.time()
npack = 0
state = 0
while(1):
    tmp = Qin.get()
    Qout.put(tmp)
    packets  = modem.processBuffer(tmp)
    for ax in packets: 
        npack = npack + 1
        print((str(npack)+")",str(ax)))
        if state == 0 and ax.destination[:5]=="BEGIN":
            f1 = open("rec_"+ax.info,"wb")
            state = 1
        elif state == 1 and ax.destination[:3] == "END": 
            state = 2  
            break
        elif state == 1:
            f1.write(ax.info)
            print("write")
    if state == 2 :
        break

print(time.time() - starttime)
cQout.put("EOT")
cQin.put("EOT")
f1.close()
f.close()