From f1c760349f5569d3797766dc1e6098403f22adc5 Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 9 Jun 2026 15:38:06 +0300 Subject: [PATCH] tests: add more cases for base test --- .../test_axi4_loopback.py | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/axi/tb/axi_cocotb_loopback_test/test_axi4_loopback.py b/axi/tb/axi_cocotb_loopback_test/test_axi4_loopback.py index d5b837f..4514a2d 100644 --- a/axi/tb/axi_cocotb_loopback_test/test_axi4_loopback.py +++ b/axi/tb/axi_cocotb_loopback_test/test_axi4_loopback.py @@ -49,3 +49,56 @@ async def run_basic_write_read_test(dut): read_data = await tb.master.read(addr, len(test_data)) assert bytes(read_data.data) == test_data + + +@cocotb.test() +async def run_unaligned_and_burst_test(dut): + tb = TB(dut) + await tb.reset() + + cases = [ + (0x0003, bytes([0x11, 0x22, 0x33, 0x44, 0x55])), + (0x0127, bytes((x * 7 + 3) & 0xFF for x in range(37))), + (0x2000, bytes(range(256))), + (0x2F1B, bytes((255 - x) & 0xFF for x in range(191))), + ] + + for addr, payload in cases: + await tb.master.write(addr, payload) + read_data = await tb.master.read(addr, len(payload)) + assert bytes(read_data.data) == payload + + +def set_pause_if_available(dut, channel): + if hasattr(channel, "set_pause_generator"): + channel.set_pause_generator(cycle_pause()) + else: + dut._log.warning( + "Channel %r has no set_pause_generator(); backpressure skipped", channel) + + +@cocotb.test() +async def run_backpressure_test(dut): + tb = TB(dut) + + # Optional stress, close to alexforencich/cocotbext-axi examples. + # It exercises valid/ready stalls on the channels that support pause generators. + for channel in [ + tb.master.write_if.aw_channel, + tb.master.write_if.w_channel, + tb.master.read_if.ar_channel, + tb.ram.write_if.b_channel, + tb.ram.read_if.r_channel, + ]: + set_pause_if_available(dut, channel) + + await tb.reset() + + payload = bytes((x * 13 + 5) & 0xFF for x in range(1024)) + addr = 0x4000 + + await tb.master.write(addr, payload) + await Timer(100, units="ns") + read_data = await tb.master.read(addr, len(payload)) + + assert bytes(read_data.data) == payload