tests: use forencich test for DMA wrapper

This commit is contained in:
Phil
2026-06-09 18:06:48 +03:00
parent a2e330d193
commit 4e8141d13f

View File

@ -1,18 +1,47 @@
# SPDX-License-Identifier: MIT
"""
Small cocotb/pytest test for tb_axi_dma_wrapper.
Adapted cocotb/pytest tests for tb_axi_dma_wrapper.
It intentionally keeps the same flat prefixes that alexforencich/verilog-axi
uses for axi_dma, while the SystemVerilog top routes the traffic through
local axi4_if/axis_if adapters and forencich_axi_dma_wrapper.
This file is based on alexforencich/verilog-axi/tb/axi_dma/test_axi_dma.py
and keeps the same cocotb-facing flat prefixes. The SystemVerilog test top
routes these flat signals through local axi4_if/axis_if converters and then
through axi_dma_if_wrapper.
Original copyright:
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
import itertools
import logging
import os
import cocotb
try:
import pytest
except ImportError: # pytest is only needed for the optional cocotb-test entrypoint
pytest = None
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.axi import AxiBus, AxiRam
from cocotbext.axi import AxiStreamBus, AxiStreamFrame, AxiStreamSource, AxiStreamSink
@ -40,39 +69,57 @@ class TB:
cocotb.start_soon(Clock(dut.clk, 10, units="ns").start())
# Descriptor and status streams are flat, exactly like Forencich tests.
# Descriptor/status streams remain flat on purpose: they are specific
# to Forencich DMA, not generic AXIS interfaces in our library.
self.read_desc_source = DescSource(
DescBus.from_prefix(dut, "s_axis_read_desc"), dut.clk, dut.rst
)
self.read_desc_status_sink = DescStatusSink(
DescStatusBus.from_prefix(dut, "m_axis_read_desc_status"), dut.clk, dut.rst
DescStatusBus.from_prefix(
dut, "m_axis_read_desc_status"), dut.clk, dut.rst
)
self.write_desc_source = DescSource(
DescBus.from_prefix(dut, "s_axis_write_desc"), dut.clk, dut.rst
)
self.write_desc_status_sink = DescStatusSink(
DescStatusBus.from_prefix(dut, "m_axis_write_desc_status"), dut.clk, dut.rst
DescStatusBus.from_prefix(
dut, "m_axis_write_desc_status"), dut.clk, dut.rst
)
# Data streams are also flat from cocotb point of view. The SV top
# converts them to axis_if before reaching the wrapper.
# Data streams are flat from cocotb's point of view, but the SV top
# sends them through axis_flat_to_if/axis_if_to_flat before/after DUT.
self.read_data_sink = AxiStreamSink(
AxiStreamBus.from_prefix(dut, "m_axis_read_data"), dut.clk, dut.rst
)
self.write_data_source = AxiStreamSource(
AxiStreamBus.from_prefix(dut, "s_axis_write_data"), dut.clk, dut.rst
AxiStreamBus.from_prefix(
dut, "s_axis_write_data"), dut.clk, dut.rst
)
# AXI memory model. The SV top converts m_axi flat signals to axi4_if
# and back, so this also tests the AXI converters.
self.axi_ram = AxiRam(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst, size=2**16)
# AXI memory model. The SV top routes this through the axi4_if adapters.
self.axi_ram = AxiRam(AxiBus.from_prefix(
dut, "m_axi"), dut.clk, dut.rst, size=2**16)
dut.read_enable.setimmediatevalue(0)
dut.write_enable.setimmediatevalue(0)
dut.write_abort.setimmediatevalue(0)
async def reset(self):
def set_idle_generator(self, generator=None):
if generator:
self.write_desc_source.set_pause_generator(generator())
self.write_data_source.set_pause_generator(generator())
self.read_desc_source.set_pause_generator(generator())
self.axi_ram.write_if.b_channel.set_pause_generator(generator())
self.axi_ram.read_if.r_channel.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
self.read_data_sink.set_pause_generator(generator())
self.axi_ram.write_if.aw_channel.set_pause_generator(generator())
self.axi_ram.write_if.w_channel.set_pause_generator(generator())
self.axi_ram.read_if.ar_channel.set_pause_generator(generator())
async def cycle_reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
@ -84,154 +131,245 @@ class TB:
await RisingEdge(self.dut.clk)
def _make_data(length, seed=0):
return bytearray(((x + seed) & 0xFF) for x in range(length))
@cocotb.test()
async def test_axi_dma_wrapper_write(dut):
async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
"""DMA write path stress test adapted from Forencich's axi_dma test."""
tb = TB(dut)
await tb.reset()
tb.dut.write_enable.value = 1
byte_lanes = tb.axi_ram.write_if.byte_lanes
step_size = 1 if int(
os.getenv("PARAM_ENABLE_UNALIGNED", "0")) else byte_lanes
tag_count = 2 ** len(tb.write_desc_source.bus.tag)
cur_tag = 1
addr = 0x1000
data = _make_data(96, seed=0x10)
tag = 3
await tb.cycle_reset()
# Guard bytes make it easier to catch wrong offsets/strobes.
tb.axi_ram.write(addr - 16, b"\xaa" * (len(data) + 32))
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.write_desc_source.send(DescTransaction(addr=addr, len=len(data), tag=tag))
await tb.write_data_source.send(AxiStreamFrame(data, tid=tag))
dut.write_enable.value = 1
status = await tb.write_desc_status_sink.recv()
tb.log.info("write status: %s", status)
for length in list(range(1, byte_lanes * 4 + 1)) + [128]:
offsets = list(range(0, byte_lanes * 2, step_size))
offsets += list(range(4096 - byte_lanes * 2, 4096, step_size))
assert int(status.error) == 0
assert int(status.tag) == tag
assert int(status.len) == len(data)
assert tb.axi_ram.read(addr - 8, len(data) + 16) == b"\xaa" * 8 + data + b"\xaa" * 8
for offset in offsets:
for diff in [-8, -2, -1, 0, 1, 2, 8]:
if length + diff < 1:
continue
tb.log.info("write: length=%d offset=%d diff=%d",
length, offset, diff)
addr = offset + 0x1000
expected_data = bytearray([x % 256 for x in range(length)])
stream_data = bytearray(
[x % 256 for x in range(length + diff)])
tb.axi_ram.write(addr - 128, b"\xaa" *
(len(expected_data) + 256))
await tb.write_desc_source.send(
DescTransaction(addr=addr, len=len(
expected_data), tag=cur_tag)
)
await tb.write_data_source.send(AxiStreamFrame(stream_data, tid=cur_tag))
status = await tb.write_desc_status_sink.recv()
tb.log.info("write status: %s", status)
transferred_len = min(len(expected_data), len(stream_data))
assert int(status.len) == transferred_len
assert int(status.tag) == cur_tag
assert int(status.id) == cur_tag
assert int(status.error) == 0
tb.log.debug(
"%s",
tb.axi_ram.hexdump_str(
(addr & ~0xF) - 16,
(((addr & 0xF) + length - 1) & ~0xF) + 48,
),
)
if len(expected_data) <= len(stream_data):
assert tb.axi_ram.read(addr - 8, len(expected_data) + 16) == (
b"\xaa" * 8 + expected_data + b"\xaa" * 8
)
else:
assert tb.axi_ram.read(addr - 8, len(stream_data) + 16) == (
b"\xaa" * 8 + stream_data + b"\xaa" * 8
)
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
@cocotb.test()
async def test_axi_dma_wrapper_read(dut):
async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
"""DMA read path stress test adapted from Forencich's axi_dma test."""
tb = TB(dut)
await tb.reset()
tb.dut.read_enable.value = 1
byte_lanes = tb.axi_ram.read_if.byte_lanes
step_size = 1 if int(
os.getenv("PARAM_ENABLE_UNALIGNED", "0")) else byte_lanes
tag_count = 2 ** len(tb.read_desc_source.bus.tag)
cur_tag = 1
addr = 0x1800
data = _make_data(113, seed=0x40)
tag = 5
stream_id = 7
await tb.cycle_reset()
tb.axi_ram.write(addr - 16, b"\xcc" * (len(data) + 32))
tb.axi_ram.write(addr, data)
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.read_desc_source.send(
DescTransaction(addr=addr, len=len(data), tag=tag, id=stream_id)
)
dut.read_enable.value = 1
status = await tb.read_desc_status_sink.recv()
frame = await tb.read_data_sink.recv()
for length in list(range(1, byte_lanes * 4 + 1)) + [128]:
offsets = list(range(0, byte_lanes * 2, step_size))
offsets += list(range(4096 - byte_lanes * 2, 4096, step_size))
tb.log.info("read status: %s", status)
tb.log.info("read frame: %s", frame)
for offset in offsets:
tb.log.info("read: length=%d offset=%d", length, offset)
assert int(status.error) == 0
assert int(status.tag) == tag
assert frame.tdata == data
assert int(frame.tid) == stream_id
addr = offset + 0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(addr - 128, b"\xaa" * (len(test_data) + 256))
tb.axi_ram.write(addr, test_data)
tb.log.debug(
"%s",
tb.axi_ram.hexdump_str(
(addr & ~0xF) - 16,
(((addr & 0xF) + length - 1) & ~0xF) + 48,
),
)
await tb.read_desc_source.send(
DescTransaction(addr=addr, len=len(
test_data), tag=cur_tag, id=cur_tag)
)
status = await tb.read_desc_status_sink.recv()
read_data = await tb.read_data_sink.recv()
tb.log.info("read status: %s", status)
tb.log.info("read data: %s", read_data)
assert int(status.tag) == cur_tag
assert int(status.error) == 0
assert read_data.tdata == test_data
assert int(read_data.tid) == cur_tag
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
# When imported by cocotb inside a simulator, generate the actual cocotb tests.
if cocotb.SIM_NAME:
for test in [run_test_write, run_test_read]:
factory = TestFactory(test)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# -----------------------------------------------------------------------------
# Optional pytest entrypoint via cocotb-test.
# Run from this directory with: pytest -q test_axi_dma_wrapper.py
# -----------------------------------------------------------------------------
def test_axi_dma_wrapper_pytest(request):
import cocotb_test.simulator
tests_dir = os.path.abspath(os.path.dirname(__file__))
project_root = os.path.abspath(os.path.join(tests_dir, "..", ".."))
def _sanitize_node_name(name):
return name.replace("[", "-").replace("]", "").replace("/", "_")
axi_if_rtl_dir = os.environ.get(
"AXI_IF_RTL_DIR", os.path.join(project_root, "rtl", "axi")
)
wrapper_rtl_dir = os.environ.get(
"WRAPPER_RTL_DIR", os.path.join(project_root, "rtl", "wrappers")
)
forencich_rtl_dir = os.environ.get(
"FORENCICH_AXI_RTL_DIR",
os.path.join(project_root, "external", "verilog-axi", "rtl"),
)
dut = "tb_axi_dma_wrapper"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
if pytest is not None:
@pytest.mark.parametrize("axi_data_width", [8, 16, 32])
@pytest.mark.parametrize("unaligned", [0, 1])
def test_axi_dma_wrapper_pytest(request, axi_data_width, unaligned):
import cocotb_test.simulator
parameters = {
"AXI_DATA_WIDTH": int(os.getenv("PARAM_AXI_DATA_WIDTH", "32")),
"AXI_ADDR_WIDTH": 16,
"AXI_ID_WIDTH": 8,
"AXI_USER_WIDTH": 1,
"AXI_MAX_BURST_LEN": 16,
"AXIS_ID_ENABLE": 1,
"AXIS_ID_WIDTH": 8,
"AXIS_DEST_ENABLE": 0,
"AXIS_DEST_WIDTH": 8,
"AXIS_USER_ENABLE": 1,
"AXIS_USER_WIDTH": 1,
"LEN_WIDTH": 20,
"TAG_WIDTH": 8,
"ENABLE_SG": 0,
"ENABLE_UNALIGNED": int(os.getenv("PARAM_ENABLE_UNALIGNED", "0")),
}
tests_dir = os.path.abspath(os.path.dirname(__file__))
project_root = os.path.abspath(os.path.join(tests_dir, "..", ".."))
parameters["AXI_STRB_WIDTH"] = parameters["AXI_DATA_WIDTH"] // 8
parameters["AXIS_DATA_WIDTH"] = parameters["AXI_DATA_WIDTH"]
parameters["AXIS_KEEP_ENABLE"] = int(parameters["AXIS_DATA_WIDTH"] > 8)
parameters["AXIS_KEEP_WIDTH"] = parameters["AXIS_DATA_WIDTH"] // 8
parameters["AXIS_LAST_ENABLE"] = 1
axi_if_rtl_dir = os.environ.get(
"AXI_IF_RTL_DIR", os.path.join(project_root, "rtl", "axi")
)
wrapper_rtl_dir = os.environ.get(
"WRAPPER_RTL_DIR", os.path.join(project_root, "rtl", "wrappers")
)
forencich_rtl_dir = os.environ.get(
"FORENCICH_AXI_RTL_DIR",
os.path.join(project_root, "external", "verilog-axi", "rtl"),
)
verilog_sources = [
os.path.join(axi_if_rtl_dir, "axi_pkg.sv"),
os.path.join(axi_if_rtl_dir, "axi_if.sv"),
os.path.join(axi_if_rtl_dir, "axis_if.sv"),
os.path.join(axi_if_rtl_dir, "axi4_flat_to_if.sv"),
os.path.join(axi_if_rtl_dir, "axi4_if_to_flat.sv"),
os.path.join(axi_if_rtl_dir, "axis_flat_to_if.sv"),
os.path.join(axi_if_rtl_dir, "axis_if_to_flat.sv"),
os.path.join(forencich_rtl_dir, "axi_dma.v"),
os.path.join(forencich_rtl_dir, "axi_dma_rd.v"),
os.path.join(forencich_rtl_dir, "axi_dma_wr.v"),
os.path.join(wrapper_rtl_dir, "forencich_axi_dma_wrapper.sv"),
os.path.join(tests_dir, "tb_axi_dma_wrapper.sv"),
]
dut = "tb_axi_dma_wrapper"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
extra_env = {f"PARAM_{k}": str(v) for k, v in parameters.items()}
sim_build = os.path.join(
tests_dir,
"sim_build",
request.node.name.replace("[", "-").replace("]", ""),
)
parameters = {
"AXI_DATA_WIDTH": axi_data_width,
"AXI_ADDR_WIDTH": 16,
"AXI_ID_WIDTH": 8,
"AXI_USER_WIDTH": 1,
"AXI_MAX_BURST_LEN": 16,
"AXIS_ID_ENABLE": 1,
"AXIS_ID_WIDTH": 8,
"AXIS_DEST_ENABLE": 0,
"AXIS_DEST_WIDTH": 8,
"AXIS_USER_ENABLE": 1,
"AXIS_USER_WIDTH": 1,
"LEN_WIDTH": 20,
"TAG_WIDTH": 8,
"ENABLE_SG": 0,
"ENABLE_UNALIGNED": unaligned,
}
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
includes=[axi_if_rtl_dir, wrapper_rtl_dir, forencich_rtl_dir],
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
waves=True,
extra_args=["--trace-structs"] if os.getenv("SIM", "verilator") == "verilator" else [],
)
parameters["AXI_STRB_WIDTH"] = parameters["AXI_DATA_WIDTH"] // 8
parameters["AXIS_DATA_WIDTH"] = parameters["AXI_DATA_WIDTH"]
parameters["AXIS_KEEP_ENABLE"] = int(parameters["AXIS_DATA_WIDTH"] > 8)
parameters["AXIS_KEEP_WIDTH"] = parameters["AXIS_DATA_WIDTH"] // 8
parameters["AXIS_LAST_ENABLE"] = 1
verilog_sources = [
os.path.join(axi_if_rtl_dir, "axi_pkg.sv"),
os.path.join(axi_if_rtl_dir, "axi_if.sv"),
os.path.join(axi_if_rtl_dir, "axis_if.sv"),
os.path.join(axi_if_rtl_dir, "axi4_flat_to_if.sv"),
os.path.join(axi_if_rtl_dir, "axi4_if_to_flat.sv"),
os.path.join(axi_if_rtl_dir, "axis_flat_to_if.sv"),
os.path.join(axi_if_rtl_dir, "axis_if_to_flat.sv"),
os.path.join(forencich_rtl_dir, "axi_dma.v"),
os.path.join(forencich_rtl_dir, "axi_dma_rd.v"),
os.path.join(forencich_rtl_dir, "axi_dma_wr.v"),
os.path.join(wrapper_rtl_dir, "axi_dma_if_wrapper.sv"),
os.path.join(tests_dir, "tb_axi_dma_wrapper.sv"),
]
extra_env = {f"PARAM_{k}": str(v) for k, v in parameters.items()}
sim_build = os.path.join(
tests_dir, "sim_build", _sanitize_node_name(request.node.name))
extra_args = []
if os.getenv("SIM", "verilator") == "verilator":
extra_args += ["--trace-structs"]
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
includes=[axi_if_rtl_dir, wrapper_rtl_dir, forencich_rtl_dir],
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
waves=bool(int(os.getenv("WAVES", "0"))),
extra_args=extra_args,
)