Raspberry Pi Pico and Waveshare sx1262 HAT

Hi All,

I am trying to connect raspberry pi pico with sx1262 to Chirpstack Server which runs locally. Using the following micropython code. Problem is, the MIC that i calculated is not the same with the Gateways Join Accept message. Has anyone seen such an error?

from sx1262 import SX1262
import time
import random
import math
import struct
import binascii
import ucryptolib
import machine  # Required for Raspberry Pi Pico

# Create an SX1262 object
sx = SX1262(spi_bus=1, clk=10, mosi=11, miso=12, cs=3, irq=20, rst=15, gpio=2)

# LoRaWAN parameters
FREQ = 868.1
BW = 125.0
SF = 10
CR = 5
SYNC_WORD = 0x34
POWER = 20
CURRENT_LIMIT = 140.0
PREAMBLE_LEN = 8

# LoRaWAN credentials
DEV_EUI = binascii.unhexlify('C777802D6356BD14')
APP_EUI = binascii.unhexlify('c8a3eb6d0127de3c')  
APP_KEY = binascii.unhexlify('acdaa050b5266e94b6b311cf8cf3c9d2')  

DUTY_CYCLE = 0.01
last_tx_time = 0
tx_time_on_air = 0
dev_nonce_counter = random.randint(0, 65535)

joined = False

def calculate_time_on_air(payload_size):
    symbol_duration = (1 << SF) / BW
    payload_symbols = 8 + max(math.ceil((8.0 * payload_size - 4.0 * SF + 28 + 16 - 20 * (0 if CR == 4/5 else 1)) / (4.0 * SF)) * CR, 0)
    return (PREAMBLE_LEN + 4.25 + payload_symbols) * symbol_duration

def reverse_bytes(byte_string):
    return b''.join(byte_string[i:i+1] for i in range(len(byte_string)-1, -1, -1))

def aes_cmac(key, data):
    block_size = 16
    K1 = b'\x00' * block_size
    K2 = b'\x00' * block_size
    zero_block = b'\x00' * block_size
    const_Rb = 0x87

    def xor_bytes(a, b):
        return bytes(x ^ y for x, y in zip(a, b))

    def shift_left(data):
        shifted = int.from_bytes(data, 'big') << 1
        return shifted.to_bytes(block_size, 'big')[:block_size]

    def generate_subkeys(key):
        aes_cmac = ucryptolib.aes(key, 1)
        L = aes_cmac.encrypt(zero_block)
        if (L[0] & 0x80) == 0:
            K1 = shift_left(L)
        else:
            K1 = xor_bytes(shift_left(L), (const_Rb).to_bytes(block_size, 'big'))
        if (K1[0] & 0x80) == 0:
            K2 = shift_left(K1)
        else:
            K2 = xor_bytes(shift_left(K1), (const_Rb).to_bytes(block_size, 'big'))
        return K1, K2

    def pad(data):
        padding_length = block_size - len(data)
        return data + b'\x80' + b'\x00' * (padding_length - 1)

    K1, K2 = generate_subkeys(key)
    n = (len(data) + block_size - 1) // block_size
    if n == 0:
        n = 1
    if len(data) % block_size == 0:
        last_block = xor_bytes(data[(n-1)*block_size:], K1)
    else:
        last_block = xor_bytes(pad(data[(n-1)*block_size:]), K2)
    aes_cmac = ucryptolib.aes(key, 1)
    x = zero_block
    for i in range(n-1):
        block = data[i*block_size:(i+1)*block_size]
        x = aes_cmac.encrypt(xor_bytes(x, block))
    x = aes_cmac.encrypt(xor_bytes(x, last_block))
    return x[:4]

def send_join_request():
    global dev_nonce_counter, last_tx_time, tx_time_on_air
    
    mhdr = bytes([0x00])
    
    dev_nonce_counter = (dev_nonce_counter + 1) % 65536
    dev_nonce = dev_nonce_counter.to_bytes(2, 'little')
    
    app_eui_le = reverse_bytes(APP_EUI)
    dev_eui_le = reverse_bytes(DEV_EUI)
    
    join_request = mhdr + app_eui_le + dev_eui_le + dev_nonce
    
    mic = aes_cmac(APP_KEY, join_request)
    
    payload = join_request + mic
    
    try:
        sx.send(payload)
        tx_time_on_air = calculate_time_on_air(len(payload))
        last_tx_time = int(time.time() * 1000)
        return True
    except Exception as e:
        print(f"Error sending Join Request: {e}")
        return False

def process_join_accept(rx_data):
    global joined
    rx_payload = parse_rx_data(rx_data)
    if not rx_payload:
        print("Invalid Join Accept data")
        return
    
    if rx_payload[0] == 0x20:
        mic_received = rx_payload[-4:]
        mic_calculated = aes_cmac(APP_KEY, rx_payload[:-4])
        if mic_received == mic_calculated:
            print(f"Join Accept successful: {binascii.hexlify(rx_payload)}")
            joined = True
        else:
            print("Join Accept MIC error")
    else:
        print("Invalid Join Accept MHDR value")

def process_downlink(rx_data):
    rx_payload = parse_rx_data(rx_data)
    if not rx_payload:
        print("Invalid downlink data")
        return
    
    print(f"Downlink message received: {binascii.hexlify(rx_payload)}")
    mhdr = rx_payload[0]
    mtype = mhdr & 0b111
    
    if mtype == 0b101:  # Unconfirmed Data Down
        print("Unconfirmed Data Down received")
    elif mtype == 0b011:  # Confirmed Data Down
        print("Confirmed Data Down received")
    elif mtype == 0b110:  # RFU
        print("RFU message received")
    else:
        print(f"Unknown message type: {mtype}")

def parse_rx_data(rx_data):
    if isinstance(rx_data, bytes):
        return rx_data
    elif isinstance(rx_data, tuple) and len(rx_data) > 0:
        return rx_data[0] if isinstance(rx_data[0], bytes) else None
    else:
        return None

def process_join_accept(rx_data):
    global joined, APP_KEY
    rx_payload = parse_rx_data(rx_data)
    if not rx_payload or len(rx_payload) < 17:
        print("Invalid Join Accept data")
        return
    
    # Print the entire received data in hex format
    print(f"Received Join Accept: {binascii.hexlify(rx_payload)}")
    
    if rx_payload[0] == 0x20:
        mhdr = rx_payload[0:1]  # Separate MHDR
        encrypted_payload = rx_payload[1:-4]  # Exclude MHDR and MIC
        mic_received = rx_payload[-4:]  # Last 4 bytes are MIC
        
        # Decrypt using AES in ECB mode
        aes = ucryptolib.aes(APP_KEY, 1)
        decrypted_payload = b''
        
        # Decrypt in 16-byte blocks
        for i in range(0, len(encrypted_payload), 16):
            block = encrypted_payload[i:i+16]
            if len(block) == 16:
                decrypted_block = aes.decrypt(block)
                decrypted_payload += decrypted_block
            else:
                # If the last block is smaller than 16 bytes, do not decrypt
                decrypted_payload += block
        
        # Calculate MIC (based on MHDR + decrypted payload)
        mic_calculated = aes_cmac(APP_KEY, mhdr + decrypted_payload)
        
        print(f"Decrypted Join Accept: {binascii.hexlify(mhdr + decrypted_payload)}")
        print(f"Received MIC: {binascii.hexlify(mic_received)}")
        print(f"Calculated MIC: {binascii.hexlify(mic_calculated)}")
        
        if mic_received == mic_calculated:
            print("Join Accept successful!")
            joined = True
            # You can calculate network session keys here
        else:
            print("Join Accept MIC error")
    else:
        print("Invalid Join Accept MHDR value")

        
def measure_cpu_temperature():
    sensor_temp = machine.ADC(4)
    conversion_factor = 3.3 / (65535)
    reading = sensor_temp.read_u16() * conversion_factor
    # The temperature is given by the formula in the Raspberry Pi Pico datasheet
    temperature = 27 - (reading - 0.706) / 0.001721
    return temperature

def send_temperature():
    temperature = measure_cpu_temperature()
    temperature_bytes = struct.pack('<f', temperature)
    
    mhdr = bytes([0x40])  # Unconfirmed Data Up
    dev_nonce = random.randint(0, 65535).to_bytes(2, 'little')
    
    payload = mhdr + DEV_EUI + dev_nonce + temperature_bytes
    
    mic = aes_cmac(APP_KEY, payload)
    final_payload = payload + mic
    
    try:
        sx.send(final_payload)
        tx_time_on_air = calculate_time_on_air(len(final_payload))
        print(f"Temperature data sent: {temperature:.2f} °C, Payload: {binascii.hexlify(final_payload)}")
        return True
    except Exception as e:
        print(f"Error sending temperature data: {e}")
        return False

try:
    sx.begin(freq=FREQ, bw=BW, sf=SF, cr=CR, syncWord=SYNC_WORD,
             power=POWER, currentLimit=CURRENT_LIMIT, preambleLength=PREAMBLE_LEN,
             implicit=False, implicitLen=0xFF,
             crcOn=True, txIq=False, rxIq=True,
             tcxoVoltage=1.7, useRegulatorLDO=False, blocking=True)
    
    print(f"LoRa started: Freq={FREQ} MHz, SF={SF}, BW={BW} kHz, CR=4/{CR}, Power={POWER} dBm")

    join_attempt = 0
    while not joined:
        current_time = int(time.time() * 1000)
        
        if current_time - last_tx_time > (tx_time_on_air / DUTY_CYCLE):
            if send_join_request():
                join_attempt += 1
                print(f"Join Request sent. Attempt count: {join_attempt}")
            
            rx_timeout = 5000
            start_time = time.time()
            while time.time() - start_time < rx_timeout:
                try:
                    rx_data = sx.recv(timeout_ms=100)
                    rx_payload = parse_rx_data(rx_data)
                    if rx_payload:
                        print(f"A response was received: {binascii.hexlify(rx_payload)}")
                        process_join_accept(rx_payload)
                        break
                except Exception as e:
                    print(f"Recv error: {e}")
            else:
                print("Join Accept not received")
        
        sleep_time = random.randint(60, 120)
        print(f"Waiting for {sleep_time} seconds")
        time.sleep(sleep_time)
        
    # After joining, measure and send temperature
    while joined:
        current_time = int(time.time() * 1000)
        
        if current_time % 30000 < 1000:  # Every 30 seconds
            send_temperature()
        
        try:
            rx_data = sx.recv(timeout_ms=100)
            rx_payload = parse_rx_data(rx_data)
            if rx_payload:
                process_downlink(rx_payload)
        except Exception as e:
            print(f"Recv error: {e}")
        
        time.sleep(0.1)

except KeyboardInterrupt:
    print("Program terminated by user")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    sx.sleep()
    print("Program terminated")

Output:
LoRa started: Freq=868.1 MHz, SF=10, BW=125.0 kHz, CR=4/5, Power=20 dBm
Join Request sent. Attempt count: 1
A response was received: b’2039e909d35c9023ffb4587d70ece26c4ff1591b062b43501a3b591b4a8594f3a1’
Received Join Accept: b’2039e909d35c9023ffb4587d70ece26c4ff1591b062b43501a3b591b4a8594f3a1’
Decrypted Join Accept: b’20261cb1b8d672a9436d2fc4447a04d1c8f1591b062b43501a3b591b4a’
Received MIC: b’8594f3a1’
Calculated MIC: b’a0079baa’
Join Accept MIC error

It could be the byte-order (endianness) of the DevEUI, AppKey, … Or just a mis-configuration of the DevEUI, AppKey, …

Hi!
I tried the code above and of course with newly created # LoRaWAN credentials.
I get the following error:

LoRa started: Freq=868.1 MHz, SF=10, BW=125.0 kHz, CR=4/5, Power=20 dBm
Unexpected error: buffer too small
Program terminated

I would greatly appreciate any assistance.

Thanks.
Tom

Where did you get the python libs for that sx1262 hat? Also, were you able to resolve your error?

Hi!
Were you able to resolve the “Join Accept MIC error” issue ? If yes, I would be grateful if you could let me know what your solution was.
Thanks.

This error is almost always just from typing in the devEUI / appKey incorrectly.

Hi Liam,
Thank you for your quick response. I am copying the keys from TTN and pasting it. I am stumped.
Any other ideas?
Thanks.

Probably consider that the code in the original message is not correct. I suggest you try and find another library (there is one for CircuitPython, don’t think there is a real one for MicroPython yet).