## -*- python -*- from __future__ import print_function import numpy as np import common from digitalhf.digitalhf_swig import viterbi27 ## ---- constellatios ----------------------------------------------------------- BPSK=np.array(zip(np.exp(2j*np.pi*np.arange(2)/2), [0,1]), common.CONST_DTYPE) QPSK=np.array(zip(np.exp(2j*np.pi*np.arange(4)/4), [0,1,3,2]), common.CONST_DTYPE) PSK8=np.array(zip(np.exp(2j*np.pi*np.arange(8)/8), [0,1,3,2,6,7,5,4]), common.CONST_DTYPE) ## ---- constellation indices --------------------------------------------------- MODE_BPSK=0 MODE_QPSK=1 MODE_8PSK=2 class LFSR(object): def __init__(self, init, taps): self._init = np.array(init, dtype=np.bool) self._state = np.array(init, dtype=np.bool) self._taps = np.array(taps, dtype=np.bool) def reset(self): self._state = self._init def next(self): self._state = np.concatenate([[np.sum(self._state&self._taps)&1], self._state[0:-1]]) return self._state LFSR_PREAMBLE = LFSR([1,1,1,1,1,1,1], [1,0,0,1,0,1,1]) LFSR_M1 = LFSR([1,1,1,1,1,1,1], [1,1,1,0,0,0,1]) LFSR_SCRAMBLE = LFSR([1,1,0,1,0,0,1,0,1,0,1,1,0,0,1], [1,0,0,0,0,0,0,0,0,0,0,0,0,0,1]) PREAMBLE = common.n_psk(2, np.array([LFSR_PREAMBLE.next()[0] for _ in range(127)])) M1 = common.n_psk(2, np.array([LFSR_M1.next()[0] for _ in range(127)])) SCRAMBLE = common.n_psk(2, np.array([LFSR_SCRAMBLE.next()[0] for _ in range(120)])) PROBE = common.n_psk(2, np.array([0,0,0,1,0,0,1,1,0,1,0,1,1,1,1])) T = np.tile(PROBE, 9) SHIFTS = [72,82,113,123,61,103,93,9] MODES = [ {'bps': 300, 'intl': {'type': 'S', 'cols': 54}, 'shift': 72, 'mode': MODE_BPSK, 'repeat': 2}, {'bps': 600, 'intl': {'type': 'S', 'cols': 54}, 'shift': 82, 'mode': MODE_BPSK, 'repeat': 1}, {'bps': 1200, 'intl': {'type': 'S', 'cols': 108}, 'shift': 113, 'mode': MODE_QPSK, 'repeat': 1}, {'bps': 1800, 'intl': {'type': 'S', 'cols': 162}, 'shift': 123, 'mode': MODE_8PSK, 'repeat': 1}, {'bps': 300, 'intl': {'type': 'L', 'cols': 126}, 'shift': 61, 'mode': MODE_BPSK, 'repeat': 2}, {'bps': 600, 'intl': {'type': 'L', 'cols': 126}, 'shift': 103, 'mode': MODE_BPSK, 'repeat': 1}, {'bps': 1200, 'intl': {'type': 'L', 'cols': 252}, 'shift': 93, 'mode': MODE_QPSK, 'repeat': 1}, {'bps': 1800, 'intl': {'type': 'L', 'cols': 378}, 'shift': 9, 'mode': MODE_8PSK, 'repeat': 1} ] ## interleaver -> number of 45-symbol frames NUM_FRAMES = { 'S': 72, 'L': 168 } ## interleave -> interleaver increment INTL_INCR = { 'S': -17, 'L': -23 } class DeInterleaver(object): def __init__(self, ncols, di): self._matrix = np.zeros((40, ncols), dtype=np.float32) self._ncols = ncols self._dj = di self._i = 0 self._j = 0 self._counter = 0 def insert(self, v): for val in v: self._counter += 1 self._matrix[self._i][self._j] = val self._i += 1 self._j = np.mod(self._j + self._dj, self._ncols) if self._i == 40: self._i = 0 self._j += 1 print('insert: ', self._i, self._j, self._counter, 40*self._ncols) return self._counter == 40*self._ncols def fetch(self): r = np.zeros(40*self._ncols) idx = np.mod(9*np.arange(40, dtype=np.int), 40) for j in range(self._ncols): r[j*40:(j+1)*40] = self._matrix[idx,j] return r ## ---- physcal layer class ----------------------------------------------------- class PhysicalLayer(object): """Physical layer description for HFDL ARINC 635""" def __init__(self, sps): """intialization""" self._sps = sps self._frame_counter = -1 self._constellations = [BPSK, QPSK, PSK8] self._preamble = self.get_preamble() self._pre_counter = -1 self._mode = {} self._viterbi_dec = viterbi27(0x6d, 0x4f) self._repeat = 1 self._mode_descr = 'UNKNOWN' def get_constellations(self): return self._constellations def get_next_frame(self, symbols): """returns a tuple describing the frame: [0] ... known+unknown symbols and scrambling [1] ... modulation type after descrambling [2] ... a boolean indicating if the processing should continue [3] ... a boolean indicating if the soft decision for the unknown symbols are saved""" print('-------------------- get_frame --------------------', self._pre_counter, self._frame_counter) success = True if len(symbols) == 0: self._frame_counter = -1 s = self.get_preamble() s.resize(15+len(s)) s['scramble'][-15:] = 1 return [s,MODE_BPSK,success,False] if self._frame_counter == -1: ## preamble mode success,idx = self.decode_preamble(symbols) print('IDX= ', idx) if idx == 0: ## 2nd preamble frame s = self._preamble s = np.roll(s, -15) s['symb'][-15:] = 0 s['scramble'][-15:] = 1 return [s,MODE_BPSK,True,False] else: self._frame_counter = 0 mode = MODES[idx-1] print('MODE=', mode) self._mode = mode['mode'] self._mode_descr = "HFDL bps=%d intl=%s " % (mode['bps'], mode['intl']['type']) self._num_frames = NUM_FRAMES[mode['intl']['type']] self._deintl = DeInterleaver(mode['intl']['cols'], INTL_INCR[mode['intl']['type']]) self._repeat = mode['repeat'] self._a = self.make_data_frame(mode) s = np.concatenate([np.roll(M1, -(15+SHIFTS[idx-1])), T]) a = common.make_scr(s,s) return [a,MODE_BPSK,True,False] if self._frame_counter >= 0: ## data print('====', self._frame_counter, self._num_frames) do_continue = use_soft_dec = self.get_data_frame_quality(symbols) if self._frame_counter == self._num_frames: self._frame_counter = 0 do_continue = False else: self._frame_counter += len(self._a)/45; if not do_continue: self._frame_counter = -2 print("SUCCESS ", do_continue, use_soft_dec) return [self._a, self._mode, do_continue, use_soft_dec] def make_data_frame(self, mode): s = np.zeros(180, dtype=common.SYMB_SCRAMBLE_DTYPE) s['scramble'][:] = 1 for i in range(0,180,45): s['scramble'][i :i+30] = SCRAMBLE[i*30/45:i*30/45+30] s['scramble'][i+30:i+45] = PROBE s['symb' ][i+30:i+45] = PROBE return s def get_data_frame_quality(self, symbols): s = symbols[-15:] mean_s = np.mean(s) tests = [np.abs(mean_s) > 0.4, np.real(mean_s) > np.imag(mean_s)] print('FRAME_QUALITY: ', s, mean_s, tests) success = all(tests) return success def get_doppler(self, iq_samples): """quality check and doppler estimation for preamble""" r = {'success': False, ## -- quality flag 'doppler': 0} ## -- doppler estimate (rad/symb) if len(iq_samples) != 0: sps = self._sps _,zp = self.get_preamble_z() ## length is sps*128 cc = np.correlate(iq_samples, zp[0:32*sps]) imax = np.argmax(np.abs(cc[0:32*sps])) print('imax=', imax, len(iq_samples), len(cc)) pks = np.zeros(4, dtype=np.complex64) for i in range(4): idx = 32*sps*i+np.arange(32*sps) idx = idx[idx