tests: add more cases for base test
This commit is contained in:
@ -49,3 +49,56 @@ async def run_basic_write_read_test(dut):
|
||||
read_data = await tb.master.read(addr, len(test_data))
|
||||
|
||||
assert bytes(read_data.data) == test_data
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def run_unaligned_and_burst_test(dut):
|
||||
tb = TB(dut)
|
||||
await tb.reset()
|
||||
|
||||
cases = [
|
||||
(0x0003, bytes([0x11, 0x22, 0x33, 0x44, 0x55])),
|
||||
(0x0127, bytes((x * 7 + 3) & 0xFF for x in range(37))),
|
||||
(0x2000, bytes(range(256))),
|
||||
(0x2F1B, bytes((255 - x) & 0xFF for x in range(191))),
|
||||
]
|
||||
|
||||
for addr, payload in cases:
|
||||
await tb.master.write(addr, payload)
|
||||
read_data = await tb.master.read(addr, len(payload))
|
||||
assert bytes(read_data.data) == payload
|
||||
|
||||
|
||||
def set_pause_if_available(dut, channel):
|
||||
if hasattr(channel, "set_pause_generator"):
|
||||
channel.set_pause_generator(cycle_pause())
|
||||
else:
|
||||
dut._log.warning(
|
||||
"Channel %r has no set_pause_generator(); backpressure skipped", channel)
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def run_backpressure_test(dut):
|
||||
tb = TB(dut)
|
||||
|
||||
# Optional stress, close to alexforencich/cocotbext-axi examples.
|
||||
# It exercises valid/ready stalls on the channels that support pause generators.
|
||||
for channel in [
|
||||
tb.master.write_if.aw_channel,
|
||||
tb.master.write_if.w_channel,
|
||||
tb.master.read_if.ar_channel,
|
||||
tb.ram.write_if.b_channel,
|
||||
tb.ram.read_if.r_channel,
|
||||
]:
|
||||
set_pause_if_available(dut, channel)
|
||||
|
||||
await tb.reset()
|
||||
|
||||
payload = bytes((x * 13 + 5) & 0xFF for x in range(1024))
|
||||
addr = 0x4000
|
||||
|
||||
await tb.master.write(addr, payload)
|
||||
await Timer(100, units="ns")
|
||||
read_data = await tb.master.read(addr, len(payload))
|
||||
|
||||
assert bytes(read_data.data) == payload
|
||||
|
||||
Reference in New Issue
Block a user