Hi All,
I am trying to connect raspberry pi pico with sx1262 to Chirpstack Server which runs locally. Using the following micropython code. Problem is, the MIC that i calculated is not the same with the Gateways Join Accept message. Has anyone seen such an error?
from sx1262 import SX1262
import time
import random
import math
import struct
import binascii
import ucryptolib
import machine # Required for Raspberry Pi Pico
# Create an SX1262 object
sx = SX1262(spi_bus=1, clk=10, mosi=11, miso=12, cs=3, irq=20, rst=15, gpio=2)
# LoRaWAN parameters
FREQ = 868.1
BW = 125.0
SF = 10
CR = 5
SYNC_WORD = 0x34
POWER = 20
CURRENT_LIMIT = 140.0
PREAMBLE_LEN = 8
# LoRaWAN credentials
DEV_EUI = binascii.unhexlify('C777802D6356BD14')
APP_EUI = binascii.unhexlify('c8a3eb6d0127de3c')
APP_KEY = binascii.unhexlify('acdaa050b5266e94b6b311cf8cf3c9d2')
DUTY_CYCLE = 0.01
last_tx_time = 0
tx_time_on_air = 0
dev_nonce_counter = random.randint(0, 65535)
joined = False
def calculate_time_on_air(payload_size):
symbol_duration = (1 << SF) / BW
payload_symbols = 8 + max(math.ceil((8.0 * payload_size - 4.0 * SF + 28 + 16 - 20 * (0 if CR == 4/5 else 1)) / (4.0 * SF)) * CR, 0)
return (PREAMBLE_LEN + 4.25 + payload_symbols) * symbol_duration
def reverse_bytes(byte_string):
return b''.join(byte_string[i:i+1] for i in range(len(byte_string)-1, -1, -1))
def aes_cmac(key, data):
block_size = 16
K1 = b'\x00' * block_size
K2 = b'\x00' * block_size
zero_block = b'\x00' * block_size
const_Rb = 0x87
def xor_bytes(a, b):
return bytes(x ^ y for x, y in zip(a, b))
def shift_left(data):
shifted = int.from_bytes(data, 'big') << 1
return shifted.to_bytes(block_size, 'big')[:block_size]
def generate_subkeys(key):
aes_cmac = ucryptolib.aes(key, 1)
L = aes_cmac.encrypt(zero_block)
if (L[0] & 0x80) == 0:
K1 = shift_left(L)
else:
K1 = xor_bytes(shift_left(L), (const_Rb).to_bytes(block_size, 'big'))
if (K1[0] & 0x80) == 0:
K2 = shift_left(K1)
else:
K2 = xor_bytes(shift_left(K1), (const_Rb).to_bytes(block_size, 'big'))
return K1, K2
def pad(data):
padding_length = block_size - len(data)
return data + b'\x80' + b'\x00' * (padding_length - 1)
K1, K2 = generate_subkeys(key)
n = (len(data) + block_size - 1) // block_size
if n == 0:
n = 1
if len(data) % block_size == 0:
last_block = xor_bytes(data[(n-1)*block_size:], K1)
else:
last_block = xor_bytes(pad(data[(n-1)*block_size:]), K2)
aes_cmac = ucryptolib.aes(key, 1)
x = zero_block
for i in range(n-1):
block = data[i*block_size:(i+1)*block_size]
x = aes_cmac.encrypt(xor_bytes(x, block))
x = aes_cmac.encrypt(xor_bytes(x, last_block))
return x[:4]
def send_join_request():
global dev_nonce_counter, last_tx_time, tx_time_on_air
mhdr = bytes([0x00])
dev_nonce_counter = (dev_nonce_counter + 1) % 65536
dev_nonce = dev_nonce_counter.to_bytes(2, 'little')
app_eui_le = reverse_bytes(APP_EUI)
dev_eui_le = reverse_bytes(DEV_EUI)
join_request = mhdr + app_eui_le + dev_eui_le + dev_nonce
mic = aes_cmac(APP_KEY, join_request)
payload = join_request + mic
try:
sx.send(payload)
tx_time_on_air = calculate_time_on_air(len(payload))
last_tx_time = int(time.time() * 1000)
return True
except Exception as e:
print(f"Error sending Join Request: {e}")
return False
def process_join_accept(rx_data):
global joined
rx_payload = parse_rx_data(rx_data)
if not rx_payload:
print("Invalid Join Accept data")
return
if rx_payload[0] == 0x20:
mic_received = rx_payload[-4:]
mic_calculated = aes_cmac(APP_KEY, rx_payload[:-4])
if mic_received == mic_calculated:
print(f"Join Accept successful: {binascii.hexlify(rx_payload)}")
joined = True
else:
print("Join Accept MIC error")
else:
print("Invalid Join Accept MHDR value")
def process_downlink(rx_data):
rx_payload = parse_rx_data(rx_data)
if not rx_payload:
print("Invalid downlink data")
return
print(f"Downlink message received: {binascii.hexlify(rx_payload)}")
mhdr = rx_payload[0]
mtype = mhdr & 0b111
if mtype == 0b101: # Unconfirmed Data Down
print("Unconfirmed Data Down received")
elif mtype == 0b011: # Confirmed Data Down
print("Confirmed Data Down received")
elif mtype == 0b110: # RFU
print("RFU message received")
else:
print(f"Unknown message type: {mtype}")
def parse_rx_data(rx_data):
if isinstance(rx_data, bytes):
return rx_data
elif isinstance(rx_data, tuple) and len(rx_data) > 0:
return rx_data[0] if isinstance(rx_data[0], bytes) else None
else:
return None
def process_join_accept(rx_data):
global joined, APP_KEY
rx_payload = parse_rx_data(rx_data)
if not rx_payload or len(rx_payload) < 17:
print("Invalid Join Accept data")
return
# Print the entire received data in hex format
print(f"Received Join Accept: {binascii.hexlify(rx_payload)}")
if rx_payload[0] == 0x20:
mhdr = rx_payload[0:1] # Separate MHDR
encrypted_payload = rx_payload[1:-4] # Exclude MHDR and MIC
mic_received = rx_payload[-4:] # Last 4 bytes are MIC
# Decrypt using AES in ECB mode
aes = ucryptolib.aes(APP_KEY, 1)
decrypted_payload = b''
# Decrypt in 16-byte blocks
for i in range(0, len(encrypted_payload), 16):
block = encrypted_payload[i:i+16]
if len(block) == 16:
decrypted_block = aes.decrypt(block)
decrypted_payload += decrypted_block
else:
# If the last block is smaller than 16 bytes, do not decrypt
decrypted_payload += block
# Calculate MIC (based on MHDR + decrypted payload)
mic_calculated = aes_cmac(APP_KEY, mhdr + decrypted_payload)
print(f"Decrypted Join Accept: {binascii.hexlify(mhdr + decrypted_payload)}")
print(f"Received MIC: {binascii.hexlify(mic_received)}")
print(f"Calculated MIC: {binascii.hexlify(mic_calculated)}")
if mic_received == mic_calculated:
print("Join Accept successful!")
joined = True
# You can calculate network session keys here
else:
print("Join Accept MIC error")
else:
print("Invalid Join Accept MHDR value")
def measure_cpu_temperature():
sensor_temp = machine.ADC(4)
conversion_factor = 3.3 / (65535)
reading = sensor_temp.read_u16() * conversion_factor
# The temperature is given by the formula in the Raspberry Pi Pico datasheet
temperature = 27 - (reading - 0.706) / 0.001721
return temperature
def send_temperature():
temperature = measure_cpu_temperature()
temperature_bytes = struct.pack('<f', temperature)
mhdr = bytes([0x40]) # Unconfirmed Data Up
dev_nonce = random.randint(0, 65535).to_bytes(2, 'little')
payload = mhdr + DEV_EUI + dev_nonce + temperature_bytes
mic = aes_cmac(APP_KEY, payload)
final_payload = payload + mic
try:
sx.send(final_payload)
tx_time_on_air = calculate_time_on_air(len(final_payload))
print(f"Temperature data sent: {temperature:.2f} °C, Payload: {binascii.hexlify(final_payload)}")
return True
except Exception as e:
print(f"Error sending temperature data: {e}")
return False
try:
sx.begin(freq=FREQ, bw=BW, sf=SF, cr=CR, syncWord=SYNC_WORD,
power=POWER, currentLimit=CURRENT_LIMIT, preambleLength=PREAMBLE_LEN,
implicit=False, implicitLen=0xFF,
crcOn=True, txIq=False, rxIq=True,
tcxoVoltage=1.7, useRegulatorLDO=False, blocking=True)
print(f"LoRa started: Freq={FREQ} MHz, SF={SF}, BW={BW} kHz, CR=4/{CR}, Power={POWER} dBm")
join_attempt = 0
while not joined:
current_time = int(time.time() * 1000)
if current_time - last_tx_time > (tx_time_on_air / DUTY_CYCLE):
if send_join_request():
join_attempt += 1
print(f"Join Request sent. Attempt count: {join_attempt}")
rx_timeout = 5000
start_time = time.time()
while time.time() - start_time < rx_timeout:
try:
rx_data = sx.recv(timeout_ms=100)
rx_payload = parse_rx_data(rx_data)
if rx_payload:
print(f"A response was received: {binascii.hexlify(rx_payload)}")
process_join_accept(rx_payload)
break
except Exception as e:
print(f"Recv error: {e}")
else:
print("Join Accept not received")
sleep_time = random.randint(60, 120)
print(f"Waiting for {sleep_time} seconds")
time.sleep(sleep_time)
# After joining, measure and send temperature
while joined:
current_time = int(time.time() * 1000)
if current_time % 30000 < 1000: # Every 30 seconds
send_temperature()
try:
rx_data = sx.recv(timeout_ms=100)
rx_payload = parse_rx_data(rx_data)
if rx_payload:
process_downlink(rx_payload)
except Exception as e:
print(f"Recv error: {e}")
time.sleep(0.1)
except KeyboardInterrupt:
print("Program terminated by user")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
sx.sleep()
print("Program terminated")
Output:
LoRa started: Freq=868.1 MHz, SF=10, BW=125.0 kHz, CR=4/5, Power=20 dBm
Join Request sent. Attempt count: 1
A response was received: b’2039e909d35c9023ffb4587d70ece26c4ff1591b062b43501a3b591b4a8594f3a1’
Received Join Accept: b’2039e909d35c9023ffb4587d70ece26c4ff1591b062b43501a3b591b4a8594f3a1’
Decrypted Join Accept: b’20261cb1b8d672a9436d2fc4447a04d1c8f1591b062b43501a3b591b4a’
Received MIC: b’8594f3a1’
Calculated MIC: b’a0079baa’
Join Accept MIC error