HFDL ARINC635 physical layer decoder added

This commit is contained in:
Christoph Mayer 2019-09-14 17:03:12 +02:00
parent 176fc41650
commit 4b0c109e9a
6 changed files with 316 additions and 19 deletions

View File

@ -62,13 +62,15 @@ class msg_proxy(gr.basic_block):
symbols = pmt.to_python(pmt.dict_ref(msg_in, pmt.intern('symbols'), pmt.PMT_NIL))
soft_dec = pmt.to_python(pmt.dict_ref(msg_in, pmt.intern('soft_dec'), pmt.PMT_NIL))
symb,constellation_idx,do_continue,save_soft_dec = self._obj.get_next_frame(symbols)
if do_continue and len(soft_dec) != 0:
bits,self._quality = self._obj.decode_soft_dec(soft_dec)
bits = np.array(bits, dtype=np.uint8)
msg_out = pmt.make_dict()
msg_out = pmt.dict_add(msg_out, pmt.intern('packet_len'), pmt.to_pmt(len(bits)))
msg = pmt.cons(msg_out, pmt.to_pmt(bits))
self.message_port_pub(self._port_bits, msg)
if len(soft_dec) != 0:
bits,q = self._obj.decode_soft_dec(soft_dec)
if len(bits) > 0:
self._quality = q
bits = np.array(bits, dtype=np.uint8)
msg_out = pmt.make_dict()
msg_out = pmt.dict_add(msg_out, pmt.intern('packet_len'), pmt.to_pmt(len(bits)))
msg = pmt.cons(msg_out, pmt.to_pmt(bits))
self.message_port_pub(self._port_bits, msg)
msg_out = pmt.to_pmt({'symb': symb['symb'],
'scramble': symb['scramble'],

View File

@ -38,6 +38,7 @@ GR_PYTHON_INSTALL(
MIL_STD_188_110C.py
MIL_STD_188_110D.py
STANAG_5511.py
HFDL_ARINC635.py
DESTINATION ${GR_PYTHON_DIR}/digitalhf/physical_layer
)

View File

@ -0,0 +1,279 @@
## -*- python -*-
from __future__ import print_function
import numpy as np
import common
from digitalhf.digitalhf_swig import viterbi27
## ---- constellatios -----------------------------------------------------------
BPSK=np.array(zip(np.exp(2j*np.pi*np.arange(2)/2), [0,1]), common.CONST_DTYPE)
QPSK=np.array(zip(np.exp(2j*np.pi*np.arange(4)/4), [0,1,3,2]), common.CONST_DTYPE)
PSK8=np.array(zip(np.exp(2j*np.pi*np.arange(8)/8), [0,1,3,2,6,7,5,4]), common.CONST_DTYPE)
## ---- constellation indices ---------------------------------------------------
MODE_BPSK=0
MODE_QPSK=1
MODE_8PSK=2
class LFSR(object):
def __init__(self, init, taps):
self._init = np.array(init, dtype=np.bool)
self._state = np.array(init, dtype=np.bool)
self._taps = np.array(taps, dtype=np.bool)
def reset(self):
self._state = self._init
def next(self):
self._state = np.concatenate([[np.sum(self._state&self._taps)&1], self._state[0:-1]])
return self._state
LFSR_PREAMBLE = LFSR([1,1,1,1,1,1,1],
[1,0,0,1,0,1,1])
LFSR_M1 = LFSR([1,1,1,1,1,1,1],
[1,1,1,0,0,0,1])
LFSR_SCRAMBLE = LFSR([1,1,0,1,0,0,1,0,1,0,1,1,0,0,1],
[1,0,0,0,0,0,0,0,0,0,0,0,0,0,1])
PREAMBLE = common.n_psk(2, np.array([LFSR_PREAMBLE.next()[0] for _ in range(127)]))
M1 = common.n_psk(2, np.array([LFSR_M1.next()[0] for _ in range(127)]))
SCRAMBLE = common.n_psk(2, np.array([LFSR_SCRAMBLE.next()[0] for _ in range(120)]))
PROBE = common.n_psk(2, np.array([0,0,0,1,0,0,1,1,0,1,0,1,1,1,1]))
T = np.tile(PROBE, 9)
SHIFTS = [72,82,113,123,61,103,93,9]
MODES = [
{'bps': 300, 'intl': {'type': 'S', 'cols': 54}, 'shift': 72, 'mode': MODE_BPSK, 'repeat': 2},
{'bps': 600, 'intl': {'type': 'S', 'cols': 54}, 'shift': 82, 'mode': MODE_BPSK, 'repeat': 1},
{'bps': 1200, 'intl': {'type': 'S', 'cols': 108}, 'shift': 113, 'mode': MODE_QPSK, 'repeat': 1},
{'bps': 1800, 'intl': {'type': 'S', 'cols': 162}, 'shift': 123, 'mode': MODE_8PSK, 'repeat': 1},
{'bps': 300, 'intl': {'type': 'L', 'cols': 126}, 'shift': 61, 'mode': MODE_BPSK, 'repeat': 2},
{'bps': 600, 'intl': {'type': 'L', 'cols': 126}, 'shift': 103, 'mode': MODE_BPSK, 'repeat': 1},
{'bps': 1200, 'intl': {'type': 'L', 'cols': 252}, 'shift': 93, 'mode': MODE_QPSK, 'repeat': 1},
{'bps': 1800, 'intl': {'type': 'L', 'cols': 378}, 'shift': 9, 'mode': MODE_8PSK, 'repeat': 1}
]
## interleaver -> number of 45-symbol frames
NUM_FRAMES = { 'S': 72,
'L': 168 }
## interleave -> interleaver increment
INTL_INCR = { 'S': -17,
'L': -23 }
class DeInterleaver(object):
def __init__(self, ncols, di):
self._matrix = np.zeros((40, ncols), dtype=np.float32)
self._ncols = ncols
self._dj = di
self._i = 0
self._j = 0
self._counter = 0
def insert(self, v):
for val in v:
self._counter += 1
self._matrix[self._i][self._j] = val
self._i += 1
self._j = np.mod(self._j + self._dj, self._ncols)
if self._i == 40:
self._i = 0
self._j += 1
print('insert: ', self._i, self._j, self._counter, 40*self._ncols)
return self._counter == 40*self._ncols
def fetch(self):
r = np.zeros(40*self._ncols)
idx = np.mod(9*np.arange(40, dtype=np.int), 40)
for j in range(self._ncols):
r[j*40:(j+1)*40] = self._matrix[idx,j]
return r
## ---- physcal layer class -----------------------------------------------------
class PhysicalLayer(object):
"""Physical layer description for HFDL ARINC 635"""
def __init__(self, sps):
"""intialization"""
self._sps = sps
self._frame_counter = -1
self._constellations = [BPSK, QPSK, PSK8]
self._preamble = self.get_preamble()
self._pre_counter = -1
self._mode = {}
self._viterbi_dec = viterbi27(0x6d, 0x4f)
self._repeat = 1
self._mode_descr = 'UNKNOWN'
def get_constellations(self):
return self._constellations
def get_next_frame(self, symbols):
"""returns a tuple describing the frame:
[0] ... known+unknown symbols and scrambling
[1] ... modulation type after descrambling
[2] ... a boolean indicating if the processing should continue
[3] ... a boolean indicating if the soft decision for the unknown
symbols are saved"""
print('-------------------- get_frame --------------------',
self._pre_counter, self._frame_counter)
success = True
if len(symbols) == 0:
self._frame_counter = -1
s = self.get_preamble()
s.resize(15+len(s))
s['scramble'][-15:] = 1
return [s,MODE_BPSK,success,False]
if self._frame_counter == -1: ## preamble mode
success,idx = self.decode_preamble(symbols)
print('IDX= ', idx)
if idx == 0: ## 2nd preamble frame
s = self._preamble
s = np.roll(s, -15)
s['symb'][-15:] = 0
s['scramble'][-15:] = 1
return [s,MODE_BPSK,True,False]
else:
self._frame_counter = 0
mode = MODES[idx-1]
print('MODE=', mode)
self._mode = mode['mode']
self._mode_descr = "HFDL bps=%d intl=%s " % (mode['bps'], mode['intl']['type'])
self._num_frames = NUM_FRAMES[mode['intl']['type']]
self._deintl = DeInterleaver(mode['intl']['cols'], INTL_INCR[mode['intl']['type']])
self._repeat = mode['repeat']
self._a = self.make_data_frame(mode)
s = np.concatenate([np.roll(M1, -(15+SHIFTS[idx-1])), T])
a = common.make_scr(s,s)
return [a,MODE_BPSK,True,False]
if self._frame_counter >= 0: ## data
print('====', self._frame_counter, self._num_frames)
do_continue = use_soft_dec = self.get_data_frame_quality(symbols)
if self._frame_counter == self._num_frames:
self._frame_counter = 0
do_continue = False
else:
self._frame_counter += len(self._a)/45;
if not do_continue:
self._frame_counter = -2
print("SUCCESS ", do_continue, use_soft_dec)
return [self._a, self._mode, do_continue, use_soft_dec]
def make_data_frame(self, mode):
s = np.zeros(180, dtype=common.SYMB_SCRAMBLE_DTYPE)
s['scramble'][:] = 1
for i in range(0,180,45):
s['scramble'][i :i+30] = SCRAMBLE[i*30/45:i*30/45+30]
s['scramble'][i+30:i+45] = PROBE
s['symb' ][i+30:i+45] = PROBE
return s
def get_data_frame_quality(self, symbols):
s = symbols[-15:]
mean_s = np.mean(s)
tests = [np.abs(mean_s) > 0.4,
np.real(mean_s) > np.imag(mean_s)]
print('FRAME_QUALITY: ', s, mean_s, tests)
success = all(tests)
return success
def get_doppler(self, iq_samples):
"""quality check and doppler estimation for preamble"""
r = {'success': False, ## -- quality flag
'doppler': 0} ## -- doppler estimate (rad/symb)
if len(iq_samples) != 0:
sps = self._sps
_,zp = self.get_preamble_z() ## length is sps*128
cc = np.correlate(iq_samples, zp[0:32*sps])
imax = np.argmax(np.abs(cc[0:32*sps]))
print('imax=', imax, len(iq_samples), len(cc))
pks = np.zeros(4, dtype=np.complex64)
for i in range(4):
idx = 32*sps*i+np.arange(32*sps)
idx = idx[idx<len(iq_samples)]
pks[i] = np.vdot(zp[idx], iq_samples[idx])
print('pks=', pks)
r['success'] = True
if r['success']:
r['doppler'] = common.freq_est(pks)/(32*sps)
print('success=', r['success'], 'doppler=', r['doppler'],
np.abs(np.array(pks)),
np.angle(np.array(pks)))
return r
def decode_preamble(self, symbols):
st = symbols[-50:-15] ## should all be 1+0i
print('st=', st)
test = np.mean(np.real(st)) > 0.5 and np.max(np.imag(st)) < 0.5
## decide what is the next frame
t = symbols[-15:]
print('t=', t, np.angle(t))
tt = np.zeros(1+len(MODES))
tt[0] = np.abs(np.vdot(t, self._preamble['symb'][0:15]))
for i in range(1,len(tt)):
tt[i] = np.abs(np.vdot(t, np.roll(M1, -SHIFTS[i-1])[0:15]))
print('XXX ', tt)
ii = np.argmax(tt)
## TODO: add a meaningful QA test
return True,ii
def set_mode(self, _):
pass
def get_mode(self):
return self._mode_descr
def decode_soft_dec(self, soft_dec):
is_full = self._deintl.insert(soft_dec)
print('decode_soft_dec ', len(soft_dec), is_full, '******************************')
if not is_full:
return [],0.0
r = self._deintl.fetch()
rd = r if self._repeat == 1 else r[0::2]+r[1::2]
self._viterbi_dec.reset()
decoded_bits = self._viterbi_dec.udpate(rd)
quality = 100.0*self._viterbi_dec.quality()/(2*len(decoded_bits))
print('qyality= ', quality, ' bits=', decoded_bits)
return decoded_bits,quality
@staticmethod
def get_preamble():
"""preamble symbols + scrambler"""
a = common.make_scr(PREAMBLE, PREAMBLE);
#a['scramble'][:]=1
return a
def get_preamble_z(self):
"""preamble symbols for preamble correlation"""
a = PhysicalLayer.get_preamble()
## add one more symbol to get a 128 symbol sequence
z = np.array([z for z in a['symb'] for _ in range(self._sps)])
return 3,np.concatenate([z, z[0:2*self._sps]])
if __name__ == '__main__':
p0=np.array([0,1,0,1,1,0,1,1,1,0,1,1,1,1,0,0,0,1,1,1,0,1,0,0,0,1,0,1,0,1,1,1,0,0,0,0,0,0,1,1,1,1,0,1,1,0,0,1,1,0,0,0,1,0,0,1,0,0,1,1,1,0,0,1,1,1,1,1,0,0,1,0,0,0,0,0,1,0,0,0,1,1,0,1,0,1,0,1,0,0,1,1,0,1,1,0,1,0,0,1,0,1,0,0,0,0,1,0,1,1,0,0,0,0,1,1,0,0,1,0,1,1,1,1,1,1,1])
# p = make_preamble()
# print(len(p), p)
# print(len(p0), p0)
# print(np.all(p==p0))
m1='011 1011 0111 1010 0010 1100 1011 1110 0010 0000 0110 0110 1100 0111 0011 1010 1110 0001 0011 0000 0101 0101 1010 0111 1001 00001 1010 1000 0111 1111'
m1='011101101111010001011001011111000100000011001101100011100111010111000010011000001010101101001111001'
# m1=make_m1()
# l= [72,82,113,123,61,103,93,9]
# print(m1)
# for i in l:
# print(np.roll(m1,-i)[:31])
# scr='13' '1B' 'C4' '25' '0F' '8C' '15' 'EF' 'CD' '6A' 'EC' '99' '6E' '23' '68'
# scr=make_scr()
print(np.concatenate([PREAMBLE,[PREAMBLE[0]]]))
##print(M1)
#print(scr)
# 300 72 61
# 600 82 103
# 1200 113 93
# 1800 123 9

View File

@ -88,8 +88,13 @@ MODE_8PSK=2
## ---- mode definitions --------------------------------------------------------
MODE = [[{} for _ in range(8)] for _ in range(8)]
MODE[7][6] = {'bit_rate':4800, 'ci':MODE_8PSK, 'interleaver':['N', 1, 1], 'unknown':32,'known':16, 'nsymb': 1, 'coding_rate': 'n/a', 'repeat': 1}
MODE[5][6] = {} # reserved
MODE[7][7] = {'bit_rate':2400, 'ci':MODE_8PSK, 'interleaver':['S', 40, 72], 'unknown':32,'known':16, 'nsymb': 1, 'coding_rate': '1/2', 'repeat': 1}
#(5,7) is reserved
MODE[5][7] = {'bit_rate': 600, 'ci':MODE_BPSK, 'interleaver':['S', 40, 18], 'unknown':20,'known':20, 'nsymb': 1, 'coding_rate': '1/2', 'repeat': 1}
MODE[6][4] = {'bit_rate':2400, 'ci':MODE_8PSK, 'interleaver':['S', 40, 72], 'unknown':32,'known':16, 'nsymb': 1, 'coding_rate': '1/2', 'repeat': 1}
MODE[4][4] = {'bit_rate':2400, 'ci':MODE_8PSK, 'interleaver':['L', 40,576], 'unknown':32,'known':16, 'nsymb': 1, 'coding_rate': '1/2', 'repeat': 1}
@ -157,6 +162,7 @@ class PhysicalLayer(object):
self._d1d2 = [-1,-1] ## D1,D2
self._mode = {}
self._scr_data = ScrambleData()
self._mode_description = 'UNKNOWN'
def get_constellations(self):
return self._constellations
@ -216,7 +222,8 @@ class PhysicalLayer(object):
def get_doppler(self, iq_samples):
"""quality check and doppler estimation for preamble"""
success,doppler = True,0
r = {'success': False, ## -- quality flag
'doppler': 0} ## -- doppler estimate (rad/symb)
if len(iq_samples) != 0:
sps = self._sps
zp = np.array([z for z in PhysicalLayer.get_preamble()['symb']
@ -230,17 +237,17 @@ class PhysicalLayer(object):
tpks = np.abs(cc[imax+3*16*sps:imax+5*16*sps])
print('imax=', imax, 'apks=',apks,
np.mean(apks), np.mean(tpks))
success = np.mean(apks) > 5*np.mean(tpks) and apks[0]/apks[1] > 0.5 and apks[0]/apks[1] < 2.0
if success:
r['success'] = np.bool(np.mean(apks) > 5*np.mean(tpks) and apks[0]/apks[1] > 0.5 and apks[0]/apks[1] < 2.0)
if r['success']:
idx = np.arange(32*sps)
pks = [np.correlate(iq_samples[imax+i*32*sps+idx],
zp[ i*32*sps+idx])[0]
for i in range(9)]
doppler = common.freq_est(pks)/(32*sps)
print('success=', success, 'doppler=', doppler,
r['doppler'] = common.freq_est(pks)/(32*sps)
print('success=', r['success'], 'doppler=', r['doppler'],
np.abs(np.array(pks)),
np.angle(np.array(pks)))
return success,doppler
return r
def decode_preamble(self, symbols):
data = [FROM_WALSH8[np.packbits
@ -251,6 +258,7 @@ class PhysicalLayer(object):
print('data=',data)
self._pre_counter = sum([(x&3)*(1<<2*y) for (x,y) in zip(data[11:14][::-1], range(3))])
self._d1d2 = data[9:11]
print('MODE:', data[9:11])
self._mode = mode = MODE[data[9]][data[10]]
self._block_len = 11520 if mode['interleaver'][0] == 'L' else 1440
self._frame_len = mode['known'] + mode['unknown']
@ -261,12 +269,18 @@ class PhysicalLayer(object):
self._deinterleaver = Deinterleaver(mode['interleaver'][1], mode['interleaver'][2])
self._depuncturer = common.Depuncturer(repeat=mode['repeat'])
self._viterbi_decoder = viterbi27(0x6d, 0x4f)
print(self._d1d2, mode, self._frame_len)
self._mode_description = 'MIL_STD_188-110A: %dbps intl=%s [U=%d,K=%d]' % (mode['bit_rate'],
mode['interleaver'][0],
mode['unknown'], mode['known'])
print(self._d1d2, mode, self._frame_len, self._mode_description)
return True
def set_mode(self, _):
pass
def get_mode(self):
return self._mode_description
def decode_soft_dec(self, soft_dec):
print('decode_soft_dec', len(soft_dec), soft_dec.dtype)
if self._mode['known'] == 0: ## orthogonal WALSH modulation
@ -285,14 +299,15 @@ class PhysicalLayer(object):
r = self._deinterleaver.load(soft_dec)
print('decode_soft_dec r=', r.shape)
if r.shape[0] == 0:
return []
return [],0.0
##print('deinterleaved bits: ', [x for x in 1*(r>0)])
rd = self._depuncturer.process(r)
self._viterbi_decoder.reset()
decoded_bits = self._viterbi_decoder.udpate(rd)
##print('bits=', decoded_bits)
print('quality={}%'.format(100.0*self._viterbi_decoder.quality()/(2*len(decoded_bits))))
return decoded_bits
quality = 100.0*self._viterbi_decoder.quality()/(2*len(decoded_bits))
print('quality={}%'.format(quality))
return decoded_bits,quality
@staticmethod
def get_preamble():

View File

@ -344,7 +344,7 @@ class PhysicalLayer(object):
print(r)
return r
def set_mode(self, mode):
def set_mode(self, _):
pass
def get_mode(self):

View File

@ -60,7 +60,7 @@ class physical_layer_driver(gr.hier_block2):
self._corr_est = digital.corr_est_cc(symbols = (preamble_samples.tolist()),
sps = sps,
mark_delay = preamble_offset,
threshold = 0.3,
threshold = 0.1,
threshold_method = 1)
self._doppler_correction = digitalhf.doppler_correction_cc(preamble_length, len(preamble_samples))
self._adaptive_filter = digitalhf.adaptive_dfe(sps, nB, nF, nW, mu, alpha)