From 4b0c109e9ab06aaf7d8d3e2d3ca58733a890ac41 Mon Sep 17 00:00:00 2001 From: Christoph Mayer Date: Sat, 14 Sep 2019 17:03:12 +0200 Subject: [PATCH] HFDL ARINC635 physical layer decoder added --- python/msg_proxy.py | 16 +- python/physical_layer/CMakeLists.txt | 1 + python/physical_layer/HFDL_ARINC635.py | 279 ++++++++++++++++++++++ python/physical_layer/MIL_STD_188_110A.py | 35 ++- python/physical_layer/MIL_STD_188_110C.py | 2 +- python/physical_layer_driver.py | 2 +- 6 files changed, 316 insertions(+), 19 deletions(-) create mode 100644 python/physical_layer/HFDL_ARINC635.py diff --git a/python/msg_proxy.py b/python/msg_proxy.py index b3f2d55..267eee1 100644 --- a/python/msg_proxy.py +++ b/python/msg_proxy.py @@ -62,13 +62,15 @@ class msg_proxy(gr.basic_block): symbols = pmt.to_python(pmt.dict_ref(msg_in, pmt.intern('symbols'), pmt.PMT_NIL)) soft_dec = pmt.to_python(pmt.dict_ref(msg_in, pmt.intern('soft_dec'), pmt.PMT_NIL)) symb,constellation_idx,do_continue,save_soft_dec = self._obj.get_next_frame(symbols) - if do_continue and len(soft_dec) != 0: - bits,self._quality = self._obj.decode_soft_dec(soft_dec) - bits = np.array(bits, dtype=np.uint8) - msg_out = pmt.make_dict() - msg_out = pmt.dict_add(msg_out, pmt.intern('packet_len'), pmt.to_pmt(len(bits))) - msg = pmt.cons(msg_out, pmt.to_pmt(bits)) - self.message_port_pub(self._port_bits, msg) + if len(soft_dec) != 0: + bits,q = self._obj.decode_soft_dec(soft_dec) + if len(bits) > 0: + self._quality = q + bits = np.array(bits, dtype=np.uint8) + msg_out = pmt.make_dict() + msg_out = pmt.dict_add(msg_out, pmt.intern('packet_len'), pmt.to_pmt(len(bits))) + msg = pmt.cons(msg_out, pmt.to_pmt(bits)) + self.message_port_pub(self._port_bits, msg) msg_out = pmt.to_pmt({'symb': symb['symb'], 'scramble': symb['scramble'], diff --git a/python/physical_layer/CMakeLists.txt b/python/physical_layer/CMakeLists.txt index 100b0cb..d637ce1 100644 --- a/python/physical_layer/CMakeLists.txt +++ b/python/physical_layer/CMakeLists.txt @@ -38,6 +38,7 @@ GR_PYTHON_INSTALL( MIL_STD_188_110C.py MIL_STD_188_110D.py STANAG_5511.py + HFDL_ARINC635.py DESTINATION ${GR_PYTHON_DIR}/digitalhf/physical_layer ) diff --git a/python/physical_layer/HFDL_ARINC635.py b/python/physical_layer/HFDL_ARINC635.py new file mode 100644 index 0000000..d514a1c --- /dev/null +++ b/python/physical_layer/HFDL_ARINC635.py @@ -0,0 +1,279 @@ +## -*- python -*- + +from __future__ import print_function +import numpy as np +import common +from digitalhf.digitalhf_swig import viterbi27 + +## ---- constellatios ----------------------------------------------------------- +BPSK=np.array(zip(np.exp(2j*np.pi*np.arange(2)/2), [0,1]), common.CONST_DTYPE) +QPSK=np.array(zip(np.exp(2j*np.pi*np.arange(4)/4), [0,1,3,2]), common.CONST_DTYPE) +PSK8=np.array(zip(np.exp(2j*np.pi*np.arange(8)/8), [0,1,3,2,6,7,5,4]), common.CONST_DTYPE) + +## ---- constellation indices --------------------------------------------------- +MODE_BPSK=0 +MODE_QPSK=1 +MODE_8PSK=2 + +class LFSR(object): + def __init__(self, init, taps): + self._init = np.array(init, dtype=np.bool) + self._state = np.array(init, dtype=np.bool) + self._taps = np.array(taps, dtype=np.bool) + + def reset(self): + self._state = self._init + + def next(self): + self._state = np.concatenate([[np.sum(self._state&self._taps)&1], self._state[0:-1]]) + return self._state + +LFSR_PREAMBLE = LFSR([1,1,1,1,1,1,1], + [1,0,0,1,0,1,1]) +LFSR_M1 = LFSR([1,1,1,1,1,1,1], + [1,1,1,0,0,0,1]) +LFSR_SCRAMBLE = LFSR([1,1,0,1,0,0,1,0,1,0,1,1,0,0,1], + [1,0,0,0,0,0,0,0,0,0,0,0,0,0,1]) +PREAMBLE = common.n_psk(2, np.array([LFSR_PREAMBLE.next()[0] for _ in range(127)])) +M1 = common.n_psk(2, np.array([LFSR_M1.next()[0] for _ in range(127)])) +SCRAMBLE = common.n_psk(2, np.array([LFSR_SCRAMBLE.next()[0] for _ in range(120)])) +PROBE = common.n_psk(2, np.array([0,0,0,1,0,0,1,1,0,1,0,1,1,1,1])) +T = np.tile(PROBE, 9) + +SHIFTS = [72,82,113,123,61,103,93,9] + +MODES = [ + {'bps': 300, 'intl': {'type': 'S', 'cols': 54}, 'shift': 72, 'mode': MODE_BPSK, 'repeat': 2}, + {'bps': 600, 'intl': {'type': 'S', 'cols': 54}, 'shift': 82, 'mode': MODE_BPSK, 'repeat': 1}, + {'bps': 1200, 'intl': {'type': 'S', 'cols': 108}, 'shift': 113, 'mode': MODE_QPSK, 'repeat': 1}, + {'bps': 1800, 'intl': {'type': 'S', 'cols': 162}, 'shift': 123, 'mode': MODE_8PSK, 'repeat': 1}, + + {'bps': 300, 'intl': {'type': 'L', 'cols': 126}, 'shift': 61, 'mode': MODE_BPSK, 'repeat': 2}, + {'bps': 600, 'intl': {'type': 'L', 'cols': 126}, 'shift': 103, 'mode': MODE_BPSK, 'repeat': 1}, + {'bps': 1200, 'intl': {'type': 'L', 'cols': 252}, 'shift': 93, 'mode': MODE_QPSK, 'repeat': 1}, + {'bps': 1800, 'intl': {'type': 'L', 'cols': 378}, 'shift': 9, 'mode': MODE_8PSK, 'repeat': 1} +] + +## interleaver -> number of 45-symbol frames +NUM_FRAMES = { 'S': 72, + 'L': 168 } + +## interleave -> interleaver increment +INTL_INCR = { 'S': -17, + 'L': -23 } + +class DeInterleaver(object): + def __init__(self, ncols, di): + self._matrix = np.zeros((40, ncols), dtype=np.float32) + self._ncols = ncols + self._dj = di + self._i = 0 + self._j = 0 + self._counter = 0 + + def insert(self, v): + for val in v: + self._counter += 1 + self._matrix[self._i][self._j] = val + self._i += 1 + self._j = np.mod(self._j + self._dj, self._ncols) + if self._i == 40: + self._i = 0 + self._j += 1 + print('insert: ', self._i, self._j, self._counter, 40*self._ncols) + return self._counter == 40*self._ncols + + def fetch(self): + r = np.zeros(40*self._ncols) + idx = np.mod(9*np.arange(40, dtype=np.int), 40) + for j in range(self._ncols): + r[j*40:(j+1)*40] = self._matrix[idx,j] + return r + +## ---- physcal layer class ----------------------------------------------------- +class PhysicalLayer(object): + """Physical layer description for HFDL ARINC 635""" + + def __init__(self, sps): + """intialization""" + self._sps = sps + self._frame_counter = -1 + self._constellations = [BPSK, QPSK, PSK8] + self._preamble = self.get_preamble() + self._pre_counter = -1 + self._mode = {} + self._viterbi_dec = viterbi27(0x6d, 0x4f) + self._repeat = 1 + self._mode_descr = 'UNKNOWN' + + def get_constellations(self): + return self._constellations + + def get_next_frame(self, symbols): + """returns a tuple describing the frame: + [0] ... known+unknown symbols and scrambling + [1] ... modulation type after descrambling + [2] ... a boolean indicating if the processing should continue + [3] ... a boolean indicating if the soft decision for the unknown + symbols are saved""" + print('-------------------- get_frame --------------------', + self._pre_counter, self._frame_counter) + success = True + if len(symbols) == 0: + self._frame_counter = -1 + s = self.get_preamble() + s.resize(15+len(s)) + s['scramble'][-15:] = 1 + return [s,MODE_BPSK,success,False] + + if self._frame_counter == -1: ## preamble mode + success,idx = self.decode_preamble(symbols) + print('IDX= ', idx) + if idx == 0: ## 2nd preamble frame + s = self._preamble + s = np.roll(s, -15) + s['symb'][-15:] = 0 + s['scramble'][-15:] = 1 + return [s,MODE_BPSK,True,False] + else: + self._frame_counter = 0 + mode = MODES[idx-1] + print('MODE=', mode) + self._mode = mode['mode'] + self._mode_descr = "HFDL bps=%d intl=%s " % (mode['bps'], mode['intl']['type']) + self._num_frames = NUM_FRAMES[mode['intl']['type']] + self._deintl = DeInterleaver(mode['intl']['cols'], INTL_INCR[mode['intl']['type']]) + self._repeat = mode['repeat'] + self._a = self.make_data_frame(mode) + s = np.concatenate([np.roll(M1, -(15+SHIFTS[idx-1])), T]) + a = common.make_scr(s,s) + return [a,MODE_BPSK,True,False] + + if self._frame_counter >= 0: ## data + print('====', self._frame_counter, self._num_frames) + do_continue = use_soft_dec = self.get_data_frame_quality(symbols) + if self._frame_counter == self._num_frames: + self._frame_counter = 0 + do_continue = False + else: + self._frame_counter += len(self._a)/45; + if not do_continue: + self._frame_counter = -2 + print("SUCCESS ", do_continue, use_soft_dec) + return [self._a, self._mode, do_continue, use_soft_dec] + + def make_data_frame(self, mode): + s = np.zeros(180, dtype=common.SYMB_SCRAMBLE_DTYPE) + s['scramble'][:] = 1 + for i in range(0,180,45): + s['scramble'][i :i+30] = SCRAMBLE[i*30/45:i*30/45+30] + s['scramble'][i+30:i+45] = PROBE + s['symb' ][i+30:i+45] = PROBE + return s + + def get_data_frame_quality(self, symbols): + s = symbols[-15:] + mean_s = np.mean(s) + tests = [np.abs(mean_s) > 0.4, + np.real(mean_s) > np.imag(mean_s)] + print('FRAME_QUALITY: ', s, mean_s, tests) + success = all(tests) + return success + + def get_doppler(self, iq_samples): + """quality check and doppler estimation for preamble""" + r = {'success': False, ## -- quality flag + 'doppler': 0} ## -- doppler estimate (rad/symb) + if len(iq_samples) != 0: + sps = self._sps + _,zp = self.get_preamble_z() ## length is sps*128 + cc = np.correlate(iq_samples, zp[0:32*sps]) + imax = np.argmax(np.abs(cc[0:32*sps])) + print('imax=', imax, len(iq_samples), len(cc)) + pks = np.zeros(4, dtype=np.complex64) + for i in range(4): + idx = 32*sps*i+np.arange(32*sps) + idx = idx[idx 5*np.mean(tpks) and apks[0]/apks[1] > 0.5 and apks[0]/apks[1] < 2.0) + if r['success']: idx = np.arange(32*sps) pks = [np.correlate(iq_samples[imax+i*32*sps+idx], zp[ i*32*sps+idx])[0] for i in range(9)] - doppler = common.freq_est(pks)/(32*sps) - print('success=', success, 'doppler=', doppler, + r['doppler'] = common.freq_est(pks)/(32*sps) + print('success=', r['success'], 'doppler=', r['doppler'], np.abs(np.array(pks)), np.angle(np.array(pks))) - return success,doppler + return r def decode_preamble(self, symbols): data = [FROM_WALSH8[np.packbits @@ -251,6 +258,7 @@ class PhysicalLayer(object): print('data=',data) self._pre_counter = sum([(x&3)*(1<<2*y) for (x,y) in zip(data[11:14][::-1], range(3))]) self._d1d2 = data[9:11] + print('MODE:', data[9:11]) self._mode = mode = MODE[data[9]][data[10]] self._block_len = 11520 if mode['interleaver'][0] == 'L' else 1440 self._frame_len = mode['known'] + mode['unknown'] @@ -261,12 +269,18 @@ class PhysicalLayer(object): self._deinterleaver = Deinterleaver(mode['interleaver'][1], mode['interleaver'][2]) self._depuncturer = common.Depuncturer(repeat=mode['repeat']) self._viterbi_decoder = viterbi27(0x6d, 0x4f) - print(self._d1d2, mode, self._frame_len) + self._mode_description = 'MIL_STD_188-110A: %dbps intl=%s [U=%d,K=%d]' % (mode['bit_rate'], + mode['interleaver'][0], + mode['unknown'], mode['known']) + print(self._d1d2, mode, self._frame_len, self._mode_description) return True def set_mode(self, _): pass + def get_mode(self): + return self._mode_description + def decode_soft_dec(self, soft_dec): print('decode_soft_dec', len(soft_dec), soft_dec.dtype) if self._mode['known'] == 0: ## orthogonal WALSH modulation @@ -285,14 +299,15 @@ class PhysicalLayer(object): r = self._deinterleaver.load(soft_dec) print('decode_soft_dec r=', r.shape) if r.shape[0] == 0: - return [] + return [],0.0 ##print('deinterleaved bits: ', [x for x in 1*(r>0)]) rd = self._depuncturer.process(r) self._viterbi_decoder.reset() decoded_bits = self._viterbi_decoder.udpate(rd) ##print('bits=', decoded_bits) - print('quality={}%'.format(100.0*self._viterbi_decoder.quality()/(2*len(decoded_bits)))) - return decoded_bits + quality = 100.0*self._viterbi_decoder.quality()/(2*len(decoded_bits)) + print('quality={}%'.format(quality)) + return decoded_bits,quality @staticmethod def get_preamble(): diff --git a/python/physical_layer/MIL_STD_188_110C.py b/python/physical_layer/MIL_STD_188_110C.py index 1502c2e..48773fb 100644 --- a/python/physical_layer/MIL_STD_188_110C.py +++ b/python/physical_layer/MIL_STD_188_110C.py @@ -344,7 +344,7 @@ class PhysicalLayer(object): print(r) return r - def set_mode(self, mode): + def set_mode(self, _): pass def get_mode(self): diff --git a/python/physical_layer_driver.py b/python/physical_layer_driver.py index 64f305c..2a73df1 100644 --- a/python/physical_layer_driver.py +++ b/python/physical_layer_driver.py @@ -60,7 +60,7 @@ class physical_layer_driver(gr.hier_block2): self._corr_est = digital.corr_est_cc(symbols = (preamble_samples.tolist()), sps = sps, mark_delay = preamble_offset, - threshold = 0.3, + threshold = 0.1, threshold_method = 1) self._doppler_correction = digitalhf.doppler_correction_cc(preamble_length, len(preamble_samples)) self._adaptive_filter = digitalhf.adaptive_dfe(sps, nB, nF, nW, mu, alpha)