From 4e8141d13fef34cd46c30f87a6e29dc59225679d Mon Sep 17 00:00:00 2001 From: Phil Date: Tue, 9 Jun 2026 18:06:48 +0300 Subject: [PATCH] tests: use forencich test for DMA wrapper --- .../axi_dma_wrapper/test_axi_dma_wrapper.py | 396 ++++++++++++------ 1 file changed, 267 insertions(+), 129 deletions(-) diff --git a/axi/tb/axi_dma_wrapper/test_axi_dma_wrapper.py b/axi/tb/axi_dma_wrapper/test_axi_dma_wrapper.py index 1a260d7..014f89f 100644 --- a/axi/tb/axi_dma_wrapper/test_axi_dma_wrapper.py +++ b/axi/tb/axi_dma_wrapper/test_axi_dma_wrapper.py @@ -1,18 +1,47 @@ # SPDX-License-Identifier: MIT """ -Small cocotb/pytest test for tb_axi_dma_wrapper. +Adapted cocotb/pytest tests for tb_axi_dma_wrapper. -It intentionally keeps the same flat prefixes that alexforencich/verilog-axi -uses for axi_dma, while the SystemVerilog top routes the traffic through -local axi4_if/axis_if adapters and forencich_axi_dma_wrapper. +This file is based on alexforencich/verilog-axi/tb/axi_dma/test_axi_dma.py +and keeps the same cocotb-facing flat prefixes. The SystemVerilog test top +routes these flat signals through local axi4_if/axis_if converters and then +through axi_dma_if_wrapper. + +Original copyright: +Copyright (c) 2020 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. """ -import os +import itertools import logging +import os import cocotb + +try: + import pytest +except ImportError: # pytest is only needed for the optional cocotb-test entrypoint + pytest = None from cocotb.clock import Clock from cocotb.triggers import RisingEdge +from cocotb.regression import TestFactory from cocotbext.axi import AxiBus, AxiRam from cocotbext.axi import AxiStreamBus, AxiStreamFrame, AxiStreamSource, AxiStreamSink @@ -40,39 +69,57 @@ class TB: cocotb.start_soon(Clock(dut.clk, 10, units="ns").start()) - # Descriptor and status streams are flat, exactly like Forencich tests. + # Descriptor/status streams remain flat on purpose: they are specific + # to Forencich DMA, not generic AXIS interfaces in our library. self.read_desc_source = DescSource( DescBus.from_prefix(dut, "s_axis_read_desc"), dut.clk, dut.rst ) self.read_desc_status_sink = DescStatusSink( - DescStatusBus.from_prefix(dut, "m_axis_read_desc_status"), dut.clk, dut.rst + DescStatusBus.from_prefix( + dut, "m_axis_read_desc_status"), dut.clk, dut.rst ) - self.write_desc_source = DescSource( DescBus.from_prefix(dut, "s_axis_write_desc"), dut.clk, dut.rst ) self.write_desc_status_sink = DescStatusSink( - DescStatusBus.from_prefix(dut, "m_axis_write_desc_status"), dut.clk, dut.rst + DescStatusBus.from_prefix( + dut, "m_axis_write_desc_status"), dut.clk, dut.rst ) - # Data streams are also flat from cocotb point of view. The SV top - # converts them to axis_if before reaching the wrapper. + # Data streams are flat from cocotb's point of view, but the SV top + # sends them through axis_flat_to_if/axis_if_to_flat before/after DUT. self.read_data_sink = AxiStreamSink( AxiStreamBus.from_prefix(dut, "m_axis_read_data"), dut.clk, dut.rst ) self.write_data_source = AxiStreamSource( - AxiStreamBus.from_prefix(dut, "s_axis_write_data"), dut.clk, dut.rst + AxiStreamBus.from_prefix( + dut, "s_axis_write_data"), dut.clk, dut.rst ) - # AXI memory model. The SV top converts m_axi flat signals to axi4_if - # and back, so this also tests the AXI converters. - self.axi_ram = AxiRam(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst, size=2**16) + # AXI memory model. The SV top routes this through the axi4_if adapters. + self.axi_ram = AxiRam(AxiBus.from_prefix( + dut, "m_axi"), dut.clk, dut.rst, size=2**16) dut.read_enable.setimmediatevalue(0) dut.write_enable.setimmediatevalue(0) dut.write_abort.setimmediatevalue(0) - async def reset(self): + def set_idle_generator(self, generator=None): + if generator: + self.write_desc_source.set_pause_generator(generator()) + self.write_data_source.set_pause_generator(generator()) + self.read_desc_source.set_pause_generator(generator()) + self.axi_ram.write_if.b_channel.set_pause_generator(generator()) + self.axi_ram.read_if.r_channel.set_pause_generator(generator()) + + def set_backpressure_generator(self, generator=None): + if generator: + self.read_data_sink.set_pause_generator(generator()) + self.axi_ram.write_if.aw_channel.set_pause_generator(generator()) + self.axi_ram.write_if.w_channel.set_pause_generator(generator()) + self.axi_ram.read_if.ar_channel.set_pause_generator(generator()) + + async def cycle_reset(self): self.dut.rst.setimmediatevalue(0) await RisingEdge(self.dut.clk) await RisingEdge(self.dut.clk) @@ -84,154 +131,245 @@ class TB: await RisingEdge(self.dut.clk) -def _make_data(length, seed=0): - return bytearray(((x + seed) & 0xFF) for x in range(length)) - - -@cocotb.test() -async def test_axi_dma_wrapper_write(dut): +async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): + """DMA write path stress test adapted from Forencich's axi_dma test.""" tb = TB(dut) - await tb.reset() - tb.dut.write_enable.value = 1 + byte_lanes = tb.axi_ram.write_if.byte_lanes + step_size = 1 if int( + os.getenv("PARAM_ENABLE_UNALIGNED", "0")) else byte_lanes + tag_count = 2 ** len(tb.write_desc_source.bus.tag) + cur_tag = 1 - addr = 0x1000 - data = _make_data(96, seed=0x10) - tag = 3 + await tb.cycle_reset() - # Guard bytes make it easier to catch wrong offsets/strobes. - tb.axi_ram.write(addr - 16, b"\xaa" * (len(data) + 32)) + tb.set_idle_generator(idle_inserter) + tb.set_backpressure_generator(backpressure_inserter) - await tb.write_desc_source.send(DescTransaction(addr=addr, len=len(data), tag=tag)) - await tb.write_data_source.send(AxiStreamFrame(data, tid=tag)) + dut.write_enable.value = 1 - status = await tb.write_desc_status_sink.recv() - tb.log.info("write status: %s", status) + for length in list(range(1, byte_lanes * 4 + 1)) + [128]: + offsets = list(range(0, byte_lanes * 2, step_size)) + offsets += list(range(4096 - byte_lanes * 2, 4096, step_size)) - assert int(status.error) == 0 - assert int(status.tag) == tag - assert int(status.len) == len(data) - assert tb.axi_ram.read(addr - 8, len(data) + 16) == b"\xaa" * 8 + data + b"\xaa" * 8 + for offset in offsets: + for diff in [-8, -2, -1, 0, 1, 2, 8]: + if length + diff < 1: + continue + + tb.log.info("write: length=%d offset=%d diff=%d", + length, offset, diff) + + addr = offset + 0x1000 + expected_data = bytearray([x % 256 for x in range(length)]) + stream_data = bytearray( + [x % 256 for x in range(length + diff)]) + + tb.axi_ram.write(addr - 128, b"\xaa" * + (len(expected_data) + 256)) + + await tb.write_desc_source.send( + DescTransaction(addr=addr, len=len( + expected_data), tag=cur_tag) + ) + await tb.write_data_source.send(AxiStreamFrame(stream_data, tid=cur_tag)) + + status = await tb.write_desc_status_sink.recv() + tb.log.info("write status: %s", status) + + transferred_len = min(len(expected_data), len(stream_data)) + + assert int(status.len) == transferred_len + assert int(status.tag) == cur_tag + assert int(status.id) == cur_tag + assert int(status.error) == 0 + + tb.log.debug( + "%s", + tb.axi_ram.hexdump_str( + (addr & ~0xF) - 16, + (((addr & 0xF) + length - 1) & ~0xF) + 48, + ), + ) + + if len(expected_data) <= len(stream_data): + assert tb.axi_ram.read(addr - 8, len(expected_data) + 16) == ( + b"\xaa" * 8 + expected_data + b"\xaa" * 8 + ) + else: + assert tb.axi_ram.read(addr - 8, len(stream_data) + 16) == ( + b"\xaa" * 8 + stream_data + b"\xaa" * 8 + ) + + cur_tag = (cur_tag + 1) % tag_count await RisingEdge(dut.clk) await RisingEdge(dut.clk) -@cocotb.test() -async def test_axi_dma_wrapper_read(dut): +async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): + """DMA read path stress test adapted from Forencich's axi_dma test.""" tb = TB(dut) - await tb.reset() - tb.dut.read_enable.value = 1 + byte_lanes = tb.axi_ram.read_if.byte_lanes + step_size = 1 if int( + os.getenv("PARAM_ENABLE_UNALIGNED", "0")) else byte_lanes + tag_count = 2 ** len(tb.read_desc_source.bus.tag) + cur_tag = 1 - addr = 0x1800 - data = _make_data(113, seed=0x40) - tag = 5 - stream_id = 7 + await tb.cycle_reset() - tb.axi_ram.write(addr - 16, b"\xcc" * (len(data) + 32)) - tb.axi_ram.write(addr, data) + tb.set_idle_generator(idle_inserter) + tb.set_backpressure_generator(backpressure_inserter) - await tb.read_desc_source.send( - DescTransaction(addr=addr, len=len(data), tag=tag, id=stream_id) - ) + dut.read_enable.value = 1 - status = await tb.read_desc_status_sink.recv() - frame = await tb.read_data_sink.recv() + for length in list(range(1, byte_lanes * 4 + 1)) + [128]: + offsets = list(range(0, byte_lanes * 2, step_size)) + offsets += list(range(4096 - byte_lanes * 2, 4096, step_size)) - tb.log.info("read status: %s", status) - tb.log.info("read frame: %s", frame) + for offset in offsets: + tb.log.info("read: length=%d offset=%d", length, offset) - assert int(status.error) == 0 - assert int(status.tag) == tag - assert frame.tdata == data - assert int(frame.tid) == stream_id + addr = offset + 0x1000 + test_data = bytearray([x % 256 for x in range(length)]) + + tb.axi_ram.write(addr - 128, b"\xaa" * (len(test_data) + 256)) + tb.axi_ram.write(addr, test_data) + + tb.log.debug( + "%s", + tb.axi_ram.hexdump_str( + (addr & ~0xF) - 16, + (((addr & 0xF) + length - 1) & ~0xF) + 48, + ), + ) + + await tb.read_desc_source.send( + DescTransaction(addr=addr, len=len( + test_data), tag=cur_tag, id=cur_tag) + ) + + status = await tb.read_desc_status_sink.recv() + read_data = await tb.read_data_sink.recv() + + tb.log.info("read status: %s", status) + tb.log.info("read data: %s", read_data) + + assert int(status.tag) == cur_tag + assert int(status.error) == 0 + assert read_data.tdata == test_data + assert int(read_data.tid) == cur_tag + + cur_tag = (cur_tag + 1) % tag_count await RisingEdge(dut.clk) await RisingEdge(dut.clk) +def cycle_pause(): + return itertools.cycle([1, 1, 1, 0]) + + +# When imported by cocotb inside a simulator, generate the actual cocotb tests. +if cocotb.SIM_NAME: + for test in [run_test_write, run_test_read]: + factory = TestFactory(test) + factory.add_option("idle_inserter", [None, cycle_pause]) + factory.add_option("backpressure_inserter", [None, cycle_pause]) + factory.generate_tests() + + # ----------------------------------------------------------------------------- # Optional pytest entrypoint via cocotb-test. # Run from this directory with: pytest -q test_axi_dma_wrapper.py # ----------------------------------------------------------------------------- -def test_axi_dma_wrapper_pytest(request): - import cocotb_test.simulator - tests_dir = os.path.abspath(os.path.dirname(__file__)) - project_root = os.path.abspath(os.path.join(tests_dir, "..", "..")) +def _sanitize_node_name(name): + return name.replace("[", "-").replace("]", "").replace("/", "_") - axi_if_rtl_dir = os.environ.get( - "AXI_IF_RTL_DIR", os.path.join(project_root, "rtl", "axi") - ) - wrapper_rtl_dir = os.environ.get( - "WRAPPER_RTL_DIR", os.path.join(project_root, "rtl", "wrappers") - ) - forencich_rtl_dir = os.environ.get( - "FORENCICH_AXI_RTL_DIR", - os.path.join(project_root, "external", "verilog-axi", "rtl"), - ) - dut = "tb_axi_dma_wrapper" - module = os.path.splitext(os.path.basename(__file__))[0] - toplevel = dut +if pytest is not None: + @pytest.mark.parametrize("axi_data_width", [8, 16, 32]) + @pytest.mark.parametrize("unaligned", [0, 1]) + def test_axi_dma_wrapper_pytest(request, axi_data_width, unaligned): + import cocotb_test.simulator - parameters = { - "AXI_DATA_WIDTH": int(os.getenv("PARAM_AXI_DATA_WIDTH", "32")), - "AXI_ADDR_WIDTH": 16, - "AXI_ID_WIDTH": 8, - "AXI_USER_WIDTH": 1, - "AXI_MAX_BURST_LEN": 16, - "AXIS_ID_ENABLE": 1, - "AXIS_ID_WIDTH": 8, - "AXIS_DEST_ENABLE": 0, - "AXIS_DEST_WIDTH": 8, - "AXIS_USER_ENABLE": 1, - "AXIS_USER_WIDTH": 1, - "LEN_WIDTH": 20, - "TAG_WIDTH": 8, - "ENABLE_SG": 0, - "ENABLE_UNALIGNED": int(os.getenv("PARAM_ENABLE_UNALIGNED", "0")), - } + tests_dir = os.path.abspath(os.path.dirname(__file__)) + project_root = os.path.abspath(os.path.join(tests_dir, "..", "..")) - parameters["AXI_STRB_WIDTH"] = parameters["AXI_DATA_WIDTH"] // 8 - parameters["AXIS_DATA_WIDTH"] = parameters["AXI_DATA_WIDTH"] - parameters["AXIS_KEEP_ENABLE"] = int(parameters["AXIS_DATA_WIDTH"] > 8) - parameters["AXIS_KEEP_WIDTH"] = parameters["AXIS_DATA_WIDTH"] // 8 - parameters["AXIS_LAST_ENABLE"] = 1 + axi_if_rtl_dir = os.environ.get( + "AXI_IF_RTL_DIR", os.path.join(project_root, "rtl", "axi") + ) + wrapper_rtl_dir = os.environ.get( + "WRAPPER_RTL_DIR", os.path.join(project_root, "rtl", "wrappers") + ) + forencich_rtl_dir = os.environ.get( + "FORENCICH_AXI_RTL_DIR", + os.path.join(project_root, "external", "verilog-axi", "rtl"), + ) - verilog_sources = [ - os.path.join(axi_if_rtl_dir, "axi_pkg.sv"), - os.path.join(axi_if_rtl_dir, "axi_if.sv"), - os.path.join(axi_if_rtl_dir, "axis_if.sv"), - os.path.join(axi_if_rtl_dir, "axi4_flat_to_if.sv"), - os.path.join(axi_if_rtl_dir, "axi4_if_to_flat.sv"), - os.path.join(axi_if_rtl_dir, "axis_flat_to_if.sv"), - os.path.join(axi_if_rtl_dir, "axis_if_to_flat.sv"), - os.path.join(forencich_rtl_dir, "axi_dma.v"), - os.path.join(forencich_rtl_dir, "axi_dma_rd.v"), - os.path.join(forencich_rtl_dir, "axi_dma_wr.v"), - os.path.join(wrapper_rtl_dir, "forencich_axi_dma_wrapper.sv"), - os.path.join(tests_dir, "tb_axi_dma_wrapper.sv"), - ] + dut = "tb_axi_dma_wrapper" + module = os.path.splitext(os.path.basename(__file__))[0] + toplevel = dut - extra_env = {f"PARAM_{k}": str(v) for k, v in parameters.items()} - sim_build = os.path.join( - tests_dir, - "sim_build", - request.node.name.replace("[", "-").replace("]", ""), - ) + parameters = { + "AXI_DATA_WIDTH": axi_data_width, + "AXI_ADDR_WIDTH": 16, + "AXI_ID_WIDTH": 8, + "AXI_USER_WIDTH": 1, + "AXI_MAX_BURST_LEN": 16, + "AXIS_ID_ENABLE": 1, + "AXIS_ID_WIDTH": 8, + "AXIS_DEST_ENABLE": 0, + "AXIS_DEST_WIDTH": 8, + "AXIS_USER_ENABLE": 1, + "AXIS_USER_WIDTH": 1, + "LEN_WIDTH": 20, + "TAG_WIDTH": 8, + "ENABLE_SG": 0, + "ENABLE_UNALIGNED": unaligned, + } - cocotb_test.simulator.run( - python_search=[tests_dir], - verilog_sources=verilog_sources, - includes=[axi_if_rtl_dir, wrapper_rtl_dir, forencich_rtl_dir], - toplevel=toplevel, - module=module, - parameters=parameters, - sim_build=sim_build, - extra_env=extra_env, - waves=True, - extra_args=["--trace-structs"] if os.getenv("SIM", "verilator") == "verilator" else [], - ) + parameters["AXI_STRB_WIDTH"] = parameters["AXI_DATA_WIDTH"] // 8 + parameters["AXIS_DATA_WIDTH"] = parameters["AXI_DATA_WIDTH"] + parameters["AXIS_KEEP_ENABLE"] = int(parameters["AXIS_DATA_WIDTH"] > 8) + parameters["AXIS_KEEP_WIDTH"] = parameters["AXIS_DATA_WIDTH"] // 8 + parameters["AXIS_LAST_ENABLE"] = 1 + + verilog_sources = [ + os.path.join(axi_if_rtl_dir, "axi_pkg.sv"), + os.path.join(axi_if_rtl_dir, "axi_if.sv"), + os.path.join(axi_if_rtl_dir, "axis_if.sv"), + os.path.join(axi_if_rtl_dir, "axi4_flat_to_if.sv"), + os.path.join(axi_if_rtl_dir, "axi4_if_to_flat.sv"), + os.path.join(axi_if_rtl_dir, "axis_flat_to_if.sv"), + os.path.join(axi_if_rtl_dir, "axis_if_to_flat.sv"), + os.path.join(forencich_rtl_dir, "axi_dma.v"), + os.path.join(forencich_rtl_dir, "axi_dma_rd.v"), + os.path.join(forencich_rtl_dir, "axi_dma_wr.v"), + os.path.join(wrapper_rtl_dir, "axi_dma_if_wrapper.sv"), + os.path.join(tests_dir, "tb_axi_dma_wrapper.sv"), + ] + + extra_env = {f"PARAM_{k}": str(v) for k, v in parameters.items()} + sim_build = os.path.join( + tests_dir, "sim_build", _sanitize_node_name(request.node.name)) + + extra_args = [] + if os.getenv("SIM", "verilator") == "verilator": + extra_args += ["--trace-structs"] + + cocotb_test.simulator.run( + python_search=[tests_dir], + verilog_sources=verilog_sources, + includes=[axi_if_rtl_dir, wrapper_rtl_dir, forencich_rtl_dir], + toplevel=toplevel, + module=module, + parameters=parameters, + sim_build=sim_build, + extra_env=extra_env, + waves=bool(int(os.getenv("WAVES", "0"))), + extra_args=extra_args, + )