Source code for Track.beacon

"""
    **Author :** *Jay Krishna*
    
    This module detects the presence of signal sent by beacon. Dynamic thresholding is applied based upon leading & lagging cells.

    Approach
    ----------------------------
     
    * signal is broken down in chunks of one second each.
    * At each point of smaller signal threshold is calculated based upon average of lagging & leading reference cells after removing guard cells from both sides.
    * Point where APRS signal is suppossed to start is found & checked against peak induced by random noise.

    Reference
    ------------------------------
    
    `Barkat, Mourad & Varshney, P.K (1987). On adaptive cell-averaging CFAR (Constant False-Alarm Rate) radar signal detection. Final Technical Report, Jun. 1984 - Dec. 1986 Syracuse Univ., NY. Dept. of Electrical and Computer Engineering <https://goo.gl/15t64a>`_

"""

import numpy as np
from scipy import signal
import math

from Modules import SignalData


[docs]def merge(peak, signal_chunk): """ Points detected very close to each other are merged and classified as same peak. Parameters ----------------------- peak : list Points of presence of beacon. signal_chunk : ndarray Numpy array of signal. Returns ------------------------- final_peak : list Points of presence of beacon after merging nearby peaks. """ final_peak = [] start = 0 for end in range(1, peak.shape[0]): if(peak[end] - peak[end - 1] < 200): pass else: point = peak[start] + np.argmax(signal_chunk[peak[start]:peak[end - 1]]) final_peak.append(point) start = end if(start == 0): point = peak[start] + np.argmax(signal_chunk[peak[start]:peak[end - 1]]) final_peak.append(point) if(peak[start] > final_peak[-1]): point = peak[start] + np.argmax(signal_chunk[peak[start]:peak[end - 1]]) final_peak.append(point) return final_peak
[docs]def radar_detect(radar, signal_chunk): """ Based upon the values calculated start of APRS signal is detected while taking care of false results due to spikes induced by noise. Parameters ----------------------- radar : ndarray Threshold values calculated at each point. signal_chunk : ndarray Numpy array of signal. Returns ------------------------- peak : list Points of presence of beacon. """ peak = [] for i in range(signal_chunk.shape[0]): if(signal_chunk[i] > radar[i]): peak.append(i) peak = np.array(peak, dtype=int) if(peak.shape[0] > 1): peak = merge(peak, signal_chunk) return peak
[docs]def cfar(signal_chunk, refLength=1000, guardLength=100, p=0.01): """ Calculates threshold value for each point using leading & lagging cells while discarding leading & lagging guard cells. Parameters ----------------------- signal_chunk : ndarray Numpy complex array of signal. refLength : int Length of reference cells guardLength : int Length of guard cells p : float Accepted probability of false alarm. Returns ----------------------- point : int Starting point of aprs signal if present. """ N = 2 * refLength alpha = (math.pow(p, -1 / N) - 1) * N cfarWin = np.ones(((refLength + guardLength) * 2 + 1, 1)) cfarWin[refLength + 1:refLength + 1 + 2 * guardLength] = 0 cfarWin = cfarWin / sum(cfarWin) signal_chunk = np.reshape(signal_chunk, (signal_chunk.shape[0], 1)) noiseLevel = signal.fftconvolve(signal_chunk, cfarWin, 'same') cfarThreshold = noiseLevel * alpha cfarThreshold[:refLength] = cfarThreshold[:refLength] * 3 cfarThreshold[cfarThreshold.shape[0] - refLength:] = cfarThreshold[cfarThreshold.shape[0] - refLength:] * 3 peak = radar_detect(cfarThreshold, signal_chunk) del noiseLevel, cfarWin, signal_chunk return peak
[docs]def check(SignaIInfo, signal_chunk): """ Breaks the signal into smaller chunks and send them sequentially for APRS signal detection. Parameters ----------------------- SignalInfo : object Instance of class SignalData having meta-data of file and signal. signal_chunk : ndarray Numpy complex array of signal. Returns ------------------------------ points : list List of starting index of aprs signal. """ value = SignaIInfo.getvalues() fs = value[3] chunksize = int(fs) chunknumber = int((signal_chunk.shape[0]) // chunksize) points = [] for i in range(chunknumber): start = i * chunksize end = start + chunksize hay = np.absolute(signal_chunk[start:end]) point = cfar(hay) for peak in point: final_point = start + peak points.append(final_point) return points