From d33e04b7c96935af60c1ed963885013eabd6920a Mon Sep 17 00:00:00 2001 From: cmayer Date: Sun, 16 Jun 2019 19:04:21 +0200 Subject: [PATCH] experimental support for HFXL mode --- python/physical_layer/MIL_STD_188_110C.py | 173 +++++++++++++++++----- 1 file changed, 132 insertions(+), 41 deletions(-) diff --git a/python/physical_layer/MIL_STD_188_110C.py b/python/physical_layer/MIL_STD_188_110C.py index b168ecb..ebc3f24 100644 --- a/python/physical_layer/MIL_STD_188_110C.py +++ b/python/physical_layer/MIL_STD_188_110C.py @@ -128,19 +128,17 @@ PREAMBLE=common.n_psk(8, np.array( 6,0,1,3,1,4,1,7,7,6,3,0,0,7,2,7,2,0,2,6,1,1,1, 2,7,7,5,3,3,6,0,5,3,3,1,0,7,1,1,0,3,0,4,0,7,3])) +BARKER_13 = [0,4,0,4,0,0,4,4,0,0,0,0,0] +MP_PLUS = [0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4] ## length 31 +MP_MINUS = [4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0] ## length 31 + ## 103 = 31 + 1 + 3*13 + 1 + 31 -REINSERTED_PREAMBLE=common.n_psk(8, np.array( - [0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4, ## MP+ - 2, - 0,4,0,4,0,0,4,4,0,0,0,0,0, # + D0 - 0,4,0,4,0,0,4,4,0,0,0,0,0, # + D1 - 0,4,0,4,0,0,4,4,0,0,0,0,0, # + D2 - 6, - 4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0])) ## MP- +REINSERTED_PREAMBLE = common.n_psk(8, np.array(MP_PLUS + [2,] + 3 * BARKER_13 + [6,] + MP_MINUS)) +HFXL_PREAMBLE = common.n_psk(8, np.array(7 * BARKER_13 + MP_PLUS)) ## length 31 mini-probes -MINI_PROBE=[common.n_psk(8, np.array([0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4])), ## sign = + (0) - common.n_psk(8, np.array([4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0]))] ## sign = - (1) +MINI_PROBE=[common.n_psk(8, np.array(MP_PLUS)), ## sign = + (0) + common.n_psk(8, np.array(MP_MINUS))] ## sign = - (1) ## ---- di-bits ---------------------------------------------------------------- TO_DIBIT=[(0,0),(0,1),(1,1),(1,0)] @@ -153,7 +151,7 @@ TO_RATE={(0,0,0): {'baud': '--------', 'bits_per_symbol': 0}, ## reserved (1,0,0): {'baud': '8000 bps', 'bits_per_symbol': 5, 'ci': MODE_32QAM}, (1,0,1): {'baud': '9600 bps', 'bits_per_symbol': 6, 'ci': MODE_64QAM}, (1,1,0): {'baud':'12800 bps', 'bits_per_symbol': 6, 'ci': MODE_64QAM}, - (1,1,1): {'baud': '--------', 'bits_per_symbol': 0}} ## reserved + (1,1,1): {'baud': 'HFXL', 'bits_per_symbol': 0}} ## reserved - used by THALES HFXL ## ---- interleaver ------------------------------------------------------------ TO_INTERLEAVER={(0,0,0): {'frames': -1, 'id': '--', 'name': 'illegal'}, @@ -190,6 +188,26 @@ INTL_INCR = { ## 1 3 9 18 36 72 '9600 bps': {'US': 229, 'VS': 805, 'S': 2089, 'M': 5137, 'L': 10273, 'VL': 17329} } +## ---- HFXL ---- +TO_HFXL_MOD = { + (0,0,0,0): MODE_BPSK, + (0,0,0,1): MODE_BPSK, + + (1,0,0,0): MODE_QPSK, + (1,0,0,1): MODE_QPSK, + + (0,1,1,0): MODE_QPSK, + (0,1,1,1): MODE_QPSK, + + (0,1,0,0): MODE_8PSK, + (0,1,0,1): MODE_8PSK, + + (0,0,1,0): MODE_32QAM, + + (1,1,0,0): MODE_16QAM, + (1,1,0,1): MODE_16QAM +} + ## ---- deinterleaver+depuncturer class DeIntl_DePunct(object): """deinterleave""" @@ -242,6 +260,7 @@ class PhysicalLayer(object): def __init__(self, sps): """intialization""" self._sps = sps + self._mode_name = '110C' # default is plain 110C, other supported mode names are '12800bpsBurst', 'HFXL' self._frame_counter = -2 self._constellations = [BPSK, QPSK, PSK8, QAM16, QAM32, QAM64, QAM64p] self._preamble = self.get_preamble() @@ -273,13 +292,18 @@ class PhysicalLayer(object): return [self.make_reinserted_preamble(self._preamble_offset,success),MODE_QPSK,success,False] if self._frame_counter >= 0: ## ---- data frames - got_reinserted_preamble = self._frame_counter == 0 + success = False self._frame_counter += 1 - if got_reinserted_preamble: + if self._frame_counter == 1: success = self.decode_reinserted_preamble(symbols) + elif self._frame_counter == 2 and self.is_HFXL(): + success = self.decode_hfxl_preamble(symbols) else: success = self.get_data_frame_quality(symbols) - return [self.make_data_frame(success),self._constellation_index,success,True] + if self.is_plain_110C() or self.is_12800bpsBurst() or self._frame_counter >= 2: + return [self.make_data_frame(success),self._constellation_index,success,success] + if self.is_HFXL() and self._frame_counter == 1: + return [self.make_hfxl_preamble(success),MODE_QPSK,success,False] def get_doppler(self, iq_samples): """quality check and doppler estimation for preamble""" @@ -323,6 +347,14 @@ class PhysicalLayer(object): print('get_data_frame_quality', np.mean(symbols[-31:])) return np.abs(np.mean(symbols[-31:])) > 0.5 + def is_plain_110C(self): + return self._mode_name == '110C' + def is_12800bpsBurst(self): + return self._mode_name == '12800bpsBurst' + def is_HFXL(self): + return self._mode_name == 'HFXL' + + def decode_reinserted_preamble(self, symbols): ## decode D0,D1,D2 success = True @@ -336,7 +368,7 @@ class PhysicalLayer(object): '\nD2', symbols[-71+26:-71+39], '\nTT', symbols[-71+4*13:], z) d0d1d2 = map(np.uint8, np.mod(np.round(np.angle(z)/np.pi*2),4)) - dibits = [TO_DIBIT[idx] for idx in d0d1d2] + self._dibits = dibits = [TO_DIBIT[idx] for idx in d0d1d2] mode = {'rate': tuple([x[0] for x in dibits]), 'interleaver': tuple([x[1] for x in dibits])} if self._mode != {}: @@ -347,15 +379,28 @@ class PhysicalLayer(object): self._rate_info = rate_info = TO_RATE[self._mode['rate']] self._intl_info = intl_info = TO_INTERLEAVER[self._mode['interleaver']] - self._12800burst_mode = mode['rate']==(1,1,0) and mode['interleaver']==(0,0,1) - print('======== rate,interleaver:', rate_info, intl_info, self._12800burst_mode) - if self._12800burst_mode: + self._mode_name = '110C' + if mode['rate']==(1,1,0) and mode['interleaver']==(0,0,1): + self._mode_name = '12800bpsBurst' + if rate_info['baud'] == 'HFXL': + self._mode_name = 'HFXL' + + print('======== rate,interleaver:', rate_info, intl_info, self._mode_name) + self._data_scramble_xor = np.zeros(256, dtype=np.uint8) + self._data_scramble = np.ones (256, dtype=np.complex64) + if self.is_12800bpsBurst(): self._scrp = ScrambleDataP() self._constellation_index = MODE_BPSK# 64QAMp - self._data_scramble = np.ones (256, dtype=np.complex64) - self._data_scramble_xor = np.zeros(256, dtype=np.uint8) ##self._data_scramble = QAM64p['points'][self._scrp.next() for _ in range(256)] - else: + elif self.is_HFXL(): + self._scramble.reset() + num_bits = 3 + iscr = np.array([self._scramble.next(num_bits) for _ in range(256)], + dtype=np.uint8) + self._data_scramble[:] = common.n_psk(8, iscr) + self._constellation_index = MODE_8PSK + pass + elif self.is_plain_110C(): self._interleaver_frames = intl_info['frames'] baud = rate_info['baud'] intl_id = intl_info['id'] @@ -363,7 +408,7 @@ class PhysicalLayer(object): intl_incr = INTL_INCR[baud][intl_id] if self._deintl_depunct == None: self._deintl_depunct = DeIntl_DePunct(size=intl_size, - incr=intl_incr) + incr=intl_incr) self._constellation_index = rate_info['ci'] print('constellation index', self._constellation_index) self._scramble.reset() @@ -371,12 +416,35 @@ class PhysicalLayer(object): iscr = np.array([self._scramble.next(num_bits) for _ in range(256)], dtype=np.uint8) print('iscr=', iscr) - self._data_scramble = np.ones (256, dtype=np.complex64) - self._data_scramble_xor = np.zeros(256, dtype=np.uint8) if rate_info['ci'] > MODE_8PSK: - self._data_scramble_xor = iscr + self._data_scramble_xor[:] = iscr else: - self._data_scramble = common.n_psk(8, iscr) + self._data_scramble[:] = common.n_psk(8, iscr) + else: + ## TODO: generate an error message + success = False + return success + + def decode_hfxl_preamble(self, symbols): + ## decode D0,D1,D2 + success = True + z = np.mean(symbols[0:7*13].reshape(7,13),1) + print('decode_hfxl_preamble: z=', z, np.mean(np.abs(z))) + if np.mean(np.abs(z)) < 0.4: + return False + ds = map(np.uint8, np.mod(np.round(np.angle(z)/np.pi*2),4)) + self._dibits += [TO_DIBIT[idx] for idx in ds] + l = tuple([x[1] for x in self._dibits[0:4]]) + try: + self._constellation_index = TO_HFXL_MOD[l] + except KeyError: + print('decode_hfxl_preamble: dibits new list', l) + self._constellation_index = MODE_8PSK + + if self._constellation_index > MODE_8PSK: + self._data_scramble[:] = 1 + print('decode_hfxl_preamble: ds=', ds, l) + print('decode_hfxl_preamble: dibits=', self._dibits) return success def make_reinserted_preamble(self, offset, success): @@ -389,31 +457,49 @@ class PhysicalLayer(object): self._frame_counter = -2 return a + def make_hfxl_preamble(self, success): + a = common.make_scr(HFXL_PREAMBLE, HFXL_PREAMBLE) + a['symb'][0:7*13] = 0 + if not success: + self._frame_counter = -2 + return a + def make_data_frame(self, success): self._preamble_offset = -72 ## all following reinserted preambles start at index -72 a = np.zeros(256+31, common.SYMB_SCRAMBLE_DTYPE) - if self._12800burst_mode: + if self.is_12800bpsBurst(): a['scramble'][:256] = QAM64p['points'][[self._scrp.next() for _ in range(256)]] - else: + elif self.is_HFXL(): a['scramble'][:256] = self._data_scramble + elif self.is_plain_110C(): + a['scramble'][:256] = self._data_scramble + else: + ## TODO: generate an error message + pass a['scramble_xor'][:256] = self._data_scramble_xor - n = (self._frame_counter-1)%72 - if self._frame_counter == 72: - self._frame_counter = -1 - m = n%18 - if m == 0: - cnt = n//18 - self._mp = (1,1,1,1,1,1,1,0)+self._mode['rate']+self._mode['interleaver']+MP_COUNTER[cnt]+(0,) - print('new mini-probe signs n=',n,'m=',m, 'cnt=',cnt, self._mp) - print('make_data_frame', m, self._mp[m]) - a['symb'][256:] = MINI_PROBE[self._mp[m]] - a['scramble'][256:] = MINI_PROBE[self._mp[m]] + if self.is_plain_110C() or self.is_12800bpsBurst(): + n = (self._frame_counter-1)%72 + if self._frame_counter == 72: + self._frame_counter = -1 ## trigger reinserted preamble + m = n%18 + if m == 0: + cnt = n//18 + self._mp = (1,1,1,1,1,1,1,0)+self._mode['rate']+self._mode['interleaver']+MP_COUNTER[cnt]+(0,) + print('new mini-probe signs n=',n,'m=',m, 'cnt=',cnt, self._mp) + print('make_data_frame', m, self._mp[m]) + a['symb'][256:] = MINI_PROBE[self._mp[m]] + a['scramble'][256:] = MINI_PROBE[self._mp[m]] + elif self.is_HFXL(): ## only plus sign mini-probes are used + a['symb'][256:] = MINI_PROBE[0] + a['scramble'][256:] = MINI_PROBE[0] + else: + pass # TODO if not success: self._frame_counter = -2 return a def decode_soft_dec(self, soft_dec): - if self._12800burst_mode: + if self.is_12800bpsBurst(): print('decode_soft_dec', len(soft_dec)) n = len(soft_dec) // 32 soft_bits = np.zeros(2*n, dtype=np.float32) @@ -426,7 +512,10 @@ class PhysicalLayer(object): soft_bits[2*i+1] = abs_soft_dec*(2*(b &1)-1) return soft_bits>0 - else: + elif self.is_HFXL(): + ## TODO + return [] + elif self.is_plain_110C(): r = self._deintl_depunct.load(soft_dec) if r.shape[0] == 0: return [] @@ -437,6 +526,8 @@ class PhysicalLayer(object): self._viterbi_decoder.quality(), len(decoded_bits))) return decoded_bits + else: + return [] @staticmethod def get_preamble():