experimental support for HFXL mode

This commit is contained in:
cmayer 2019-06-16 19:04:21 +02:00
parent 466cab4e6b
commit d33e04b7c9
1 changed files with 132 additions and 41 deletions

View File

@ -128,19 +128,17 @@ PREAMBLE=common.n_psk(8, np.array(
6,0,1,3,1,4,1,7,7,6,3,0,0,7,2,7,2,0,2,6,1,1,1,
2,7,7,5,3,3,6,0,5,3,3,1,0,7,1,1,0,3,0,4,0,7,3]))
BARKER_13 = [0,4,0,4,0,0,4,4,0,0,0,0,0]
MP_PLUS = [0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4] ## length 31
MP_MINUS = [4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0] ## length 31
## 103 = 31 + 1 + 3*13 + 1 + 31
REINSERTED_PREAMBLE=common.n_psk(8, np.array(
[0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4, ## MP+
2,
0,4,0,4,0,0,4,4,0,0,0,0,0, # + D0
0,4,0,4,0,0,4,4,0,0,0,0,0, # + D1
0,4,0,4,0,0,4,4,0,0,0,0,0, # + D2
6,
4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0])) ## MP-
REINSERTED_PREAMBLE = common.n_psk(8, np.array(MP_PLUS + [2,] + 3 * BARKER_13 + [6,] + MP_MINUS))
HFXL_PREAMBLE = common.n_psk(8, np.array(7 * BARKER_13 + MP_PLUS))
## length 31 mini-probes
MINI_PROBE=[common.n_psk(8, np.array([0,0,0,0,0,2,4,6,0,4,0,4,0,6,4,2,0,0,0,0,0,2,4,6,0,4,0,4,0,6,4])), ## sign = + (0)
common.n_psk(8, np.array([4,4,4,4,4,6,0,2,4,0,4,0,4,2,0,6,4,4,4,4,4,6,0,2,4,0,4,0,4,2,0]))] ## sign = - (1)
MINI_PROBE=[common.n_psk(8, np.array(MP_PLUS)), ## sign = + (0)
common.n_psk(8, np.array(MP_MINUS))] ## sign = - (1)
## ---- di-bits ----------------------------------------------------------------
TO_DIBIT=[(0,0),(0,1),(1,1),(1,0)]
@ -153,7 +151,7 @@ TO_RATE={(0,0,0): {'baud': '--------', 'bits_per_symbol': 0}, ## reserved
(1,0,0): {'baud': '8000 bps', 'bits_per_symbol': 5, 'ci': MODE_32QAM},
(1,0,1): {'baud': '9600 bps', 'bits_per_symbol': 6, 'ci': MODE_64QAM},
(1,1,0): {'baud':'12800 bps', 'bits_per_symbol': 6, 'ci': MODE_64QAM},
(1,1,1): {'baud': '--------', 'bits_per_symbol': 0}} ## reserved
(1,1,1): {'baud': 'HFXL', 'bits_per_symbol': 0}} ## reserved - used by THALES HFXL
## ---- interleaver ------------------------------------------------------------
TO_INTERLEAVER={(0,0,0): {'frames': -1, 'id': '--', 'name': 'illegal'},
@ -190,6 +188,26 @@ INTL_INCR = { ## 1 3 9 18 36 72
'9600 bps': {'US': 229, 'VS': 805, 'S': 2089, 'M': 5137, 'L': 10273, 'VL': 17329}
}
## ---- HFXL ----
TO_HFXL_MOD = {
(0,0,0,0): MODE_BPSK,
(0,0,0,1): MODE_BPSK,
(1,0,0,0): MODE_QPSK,
(1,0,0,1): MODE_QPSK,
(0,1,1,0): MODE_QPSK,
(0,1,1,1): MODE_QPSK,
(0,1,0,0): MODE_8PSK,
(0,1,0,1): MODE_8PSK,
(0,0,1,0): MODE_32QAM,
(1,1,0,0): MODE_16QAM,
(1,1,0,1): MODE_16QAM
}
## ---- deinterleaver+depuncturer
class DeIntl_DePunct(object):
"""deinterleave"""
@ -242,6 +260,7 @@ class PhysicalLayer(object):
def __init__(self, sps):
"""intialization"""
self._sps = sps
self._mode_name = '110C' # default is plain 110C, other supported mode names are '12800bpsBurst', 'HFXL'
self._frame_counter = -2
self._constellations = [BPSK, QPSK, PSK8, QAM16, QAM32, QAM64, QAM64p]
self._preamble = self.get_preamble()
@ -273,13 +292,18 @@ class PhysicalLayer(object):
return [self.make_reinserted_preamble(self._preamble_offset,success),MODE_QPSK,success,False]
if self._frame_counter >= 0: ## ---- data frames
got_reinserted_preamble = self._frame_counter == 0
success = False
self._frame_counter += 1
if got_reinserted_preamble:
if self._frame_counter == 1:
success = self.decode_reinserted_preamble(symbols)
elif self._frame_counter == 2 and self.is_HFXL():
success = self.decode_hfxl_preamble(symbols)
else:
success = self.get_data_frame_quality(symbols)
return [self.make_data_frame(success),self._constellation_index,success,True]
if self.is_plain_110C() or self.is_12800bpsBurst() or self._frame_counter >= 2:
return [self.make_data_frame(success),self._constellation_index,success,success]
if self.is_HFXL() and self._frame_counter == 1:
return [self.make_hfxl_preamble(success),MODE_QPSK,success,False]
def get_doppler(self, iq_samples):
"""quality check and doppler estimation for preamble"""
@ -323,6 +347,14 @@ class PhysicalLayer(object):
print('get_data_frame_quality', np.mean(symbols[-31:]))
return np.abs(np.mean(symbols[-31:])) > 0.5
def is_plain_110C(self):
return self._mode_name == '110C'
def is_12800bpsBurst(self):
return self._mode_name == '12800bpsBurst'
def is_HFXL(self):
return self._mode_name == 'HFXL'
def decode_reinserted_preamble(self, symbols):
## decode D0,D1,D2
success = True
@ -336,7 +368,7 @@ class PhysicalLayer(object):
'\nD2', symbols[-71+26:-71+39],
'\nTT', symbols[-71+4*13:], z)
d0d1d2 = map(np.uint8, np.mod(np.round(np.angle(z)/np.pi*2),4))
dibits = [TO_DIBIT[idx] for idx in d0d1d2]
self._dibits = dibits = [TO_DIBIT[idx] for idx in d0d1d2]
mode = {'rate': tuple([x[0] for x in dibits]),
'interleaver': tuple([x[1] for x in dibits])}
if self._mode != {}:
@ -347,15 +379,28 @@ class PhysicalLayer(object):
self._rate_info = rate_info = TO_RATE[self._mode['rate']]
self._intl_info = intl_info = TO_INTERLEAVER[self._mode['interleaver']]
self._12800burst_mode = mode['rate']==(1,1,0) and mode['interleaver']==(0,0,1)
print('======== rate,interleaver:', rate_info, intl_info, self._12800burst_mode)
if self._12800burst_mode:
self._mode_name = '110C'
if mode['rate']==(1,1,0) and mode['interleaver']==(0,0,1):
self._mode_name = '12800bpsBurst'
if rate_info['baud'] == 'HFXL':
self._mode_name = 'HFXL'
print('======== rate,interleaver:', rate_info, intl_info, self._mode_name)
self._data_scramble_xor = np.zeros(256, dtype=np.uint8)
self._data_scramble = np.ones (256, dtype=np.complex64)
if self.is_12800bpsBurst():
self._scrp = ScrambleDataP()
self._constellation_index = MODE_BPSK# 64QAMp
self._data_scramble = np.ones (256, dtype=np.complex64)
self._data_scramble_xor = np.zeros(256, dtype=np.uint8)
##self._data_scramble = QAM64p['points'][self._scrp.next() for _ in range(256)]
else:
elif self.is_HFXL():
self._scramble.reset()
num_bits = 3
iscr = np.array([self._scramble.next(num_bits) for _ in range(256)],
dtype=np.uint8)
self._data_scramble[:] = common.n_psk(8, iscr)
self._constellation_index = MODE_8PSK
pass
elif self.is_plain_110C():
self._interleaver_frames = intl_info['frames']
baud = rate_info['baud']
intl_id = intl_info['id']
@ -363,7 +408,7 @@ class PhysicalLayer(object):
intl_incr = INTL_INCR[baud][intl_id]
if self._deintl_depunct == None:
self._deintl_depunct = DeIntl_DePunct(size=intl_size,
incr=intl_incr)
incr=intl_incr)
self._constellation_index = rate_info['ci']
print('constellation index', self._constellation_index)
self._scramble.reset()
@ -371,12 +416,35 @@ class PhysicalLayer(object):
iscr = np.array([self._scramble.next(num_bits) for _ in range(256)],
dtype=np.uint8)
print('iscr=', iscr)
self._data_scramble = np.ones (256, dtype=np.complex64)
self._data_scramble_xor = np.zeros(256, dtype=np.uint8)
if rate_info['ci'] > MODE_8PSK:
self._data_scramble_xor = iscr
self._data_scramble_xor[:] = iscr
else:
self._data_scramble = common.n_psk(8, iscr)
self._data_scramble[:] = common.n_psk(8, iscr)
else:
## TODO: generate an error message
success = False
return success
def decode_hfxl_preamble(self, symbols):
## decode D0,D1,D2
success = True
z = np.mean(symbols[0:7*13].reshape(7,13),1)
print('decode_hfxl_preamble: z=', z, np.mean(np.abs(z)))
if np.mean(np.abs(z)) < 0.4:
return False
ds = map(np.uint8, np.mod(np.round(np.angle(z)/np.pi*2),4))
self._dibits += [TO_DIBIT[idx] for idx in ds]
l = tuple([x[1] for x in self._dibits[0:4]])
try:
self._constellation_index = TO_HFXL_MOD[l]
except KeyError:
print('decode_hfxl_preamble: dibits new list', l)
self._constellation_index = MODE_8PSK
if self._constellation_index > MODE_8PSK:
self._data_scramble[:] = 1
print('decode_hfxl_preamble: ds=', ds, l)
print('decode_hfxl_preamble: dibits=', self._dibits)
return success
def make_reinserted_preamble(self, offset, success):
@ -389,31 +457,49 @@ class PhysicalLayer(object):
self._frame_counter = -2
return a
def make_hfxl_preamble(self, success):
a = common.make_scr(HFXL_PREAMBLE, HFXL_PREAMBLE)
a['symb'][0:7*13] = 0
if not success:
self._frame_counter = -2
return a
def make_data_frame(self, success):
self._preamble_offset = -72 ## all following reinserted preambles start at index -72
a = np.zeros(256+31, common.SYMB_SCRAMBLE_DTYPE)
if self._12800burst_mode:
if self.is_12800bpsBurst():
a['scramble'][:256] = QAM64p['points'][[self._scrp.next() for _ in range(256)]]
else:
elif self.is_HFXL():
a['scramble'][:256] = self._data_scramble
elif self.is_plain_110C():
a['scramble'][:256] = self._data_scramble
else:
## TODO: generate an error message
pass
a['scramble_xor'][:256] = self._data_scramble_xor
n = (self._frame_counter-1)%72
if self._frame_counter == 72:
self._frame_counter = -1
m = n%18
if m == 0:
cnt = n//18
self._mp = (1,1,1,1,1,1,1,0)+self._mode['rate']+self._mode['interleaver']+MP_COUNTER[cnt]+(0,)
print('new mini-probe signs n=',n,'m=',m, 'cnt=',cnt, self._mp)
print('make_data_frame', m, self._mp[m])
a['symb'][256:] = MINI_PROBE[self._mp[m]]
a['scramble'][256:] = MINI_PROBE[self._mp[m]]
if self.is_plain_110C() or self.is_12800bpsBurst():
n = (self._frame_counter-1)%72
if self._frame_counter == 72:
self._frame_counter = -1 ## trigger reinserted preamble
m = n%18
if m == 0:
cnt = n//18
self._mp = (1,1,1,1,1,1,1,0)+self._mode['rate']+self._mode['interleaver']+MP_COUNTER[cnt]+(0,)
print('new mini-probe signs n=',n,'m=',m, 'cnt=',cnt, self._mp)
print('make_data_frame', m, self._mp[m])
a['symb'][256:] = MINI_PROBE[self._mp[m]]
a['scramble'][256:] = MINI_PROBE[self._mp[m]]
elif self.is_HFXL(): ## only plus sign mini-probes are used
a['symb'][256:] = MINI_PROBE[0]
a['scramble'][256:] = MINI_PROBE[0]
else:
pass # TODO
if not success:
self._frame_counter = -2
return a
def decode_soft_dec(self, soft_dec):
if self._12800burst_mode:
if self.is_12800bpsBurst():
print('decode_soft_dec', len(soft_dec))
n = len(soft_dec) // 32
soft_bits = np.zeros(2*n, dtype=np.float32)
@ -426,7 +512,10 @@ class PhysicalLayer(object):
soft_bits[2*i+1] = abs_soft_dec*(2*(b &1)-1)
return soft_bits>0
else:
elif self.is_HFXL():
## TODO
return []
elif self.is_plain_110C():
r = self._deintl_depunct.load(soft_dec)
if r.shape[0] == 0:
return []
@ -437,6 +526,8 @@ class PhysicalLayer(object):
self._viterbi_decoder.quality(),
len(decoded_bits)))
return decoded_bits
else:
return []
@staticmethod
def get_preamble():