Reduce capture-path I/O to avoid ADC+DIN overflow

This commit is contained in:
kamil
2026-04-09 01:22:33 +03:00
parent b5a76d1f7c
commit 8710ed8b39
3 changed files with 347 additions and 62 deletions

134
main.cpp
View File

@ -61,13 +61,19 @@ struct Config {
uint32_t sync_start_mode = X502_SYNC_DI_SYN2_RISE;
StopMode stop_mode = StopMode::DiSyn2Fall;
uint32_t recv_block_words = 4096;
uint32_t recv_block_words = 32768;
uint32_t recv_timeout_ms = 50;
uint32_t stats_period_ms = 1000;
uint32_t start_wait_ms = 10000;
uint32_t input_buffer_words = 4 * 1024 * 1024;
uint32_t input_step_words = 4096;
uint32_t live_update_period_ms = 500;
uint32_t input_buffer_words = 8 * 1024 * 1024;
uint32_t input_step_words = 32768;
uint32_t live_update_period_ms = 1000;
uint32_t svg_history_packets = 50;
bool recv_block_specified = false;
bool input_buffer_specified = false;
bool input_step_specified = false;
bool live_update_specified = false;
bool pullup_syn1 = false;
bool pullup_syn2 = false;
@ -331,7 +337,8 @@ void print_help(const char* exe_name) {
<< " [internal_ref_hz:2000000]\n"
<< " [duration_ms:100] [packet_limit:0] [csv:capture.csv] [svg:capture.svg]\n"
<< " [live_html:live_plot.html] [live_json:live_plot.json]\n"
<< " [recv_block:4096] [stats_period_ms:1000] [live_update_period_ms:500] [start_wait_ms:10000]\n"
<< " [recv_block:32768] [stats_period_ms:1000] [live_update_period_ms:1000] [svg_history_packets:50] [start_wait_ms:10000]\n"
<< " [buffer_words:8388608] [step_words:32768]\n"
<< " [pullup_syn1] [pullup_syn2] [pulldown_conv_in] [pulldown_start_in]\n"
<< "\n"
<< "Defaults for E-502:\n"
@ -349,9 +356,13 @@ void print_help(const char* exe_name) {
<< " sample_clock_hz:max -> with clock:internal, use the maximum ADC speed\n"
<< " internal_ref_hz:2000000 -> internal base clock for clock:internal (1500000 or 2000000)\n"
<< " stats_period_ms:1000 -> print online transfer/capture statistics every 1000 ms (0 disables)\n"
<< " live_update_period_ms:500 -> refresh live HTML/JSON no more than twice per second\n"
<< " recv_block:32768 -> request up to 32768 raw 32-bit words per X502_Recv() call\n"
<< " live_update_period_ms:1000 -> refresh live HTML/JSON no more than once per second\n"
<< " svg_history_packets:50 -> keep last 50 packets in RAM for final SVG (0 keeps all, risky)\n"
<< " duration_ms:100 -> max packet length if stop edge does not arrive\n"
<< " packet_limit:0 -> 0 means continuous until Ctrl+C, N means stop after N packets\n"
<< " buffer_words:8388608 -> input stream buffer size in 32-bit words\n"
<< " step_words:32768 -> input stream transfer step in 32-bit words\n"
<< " live_html/live_json -> live graph files updated as packets arrive\n"
<< " If sample_clock_hz is omitted together with clock:internal, the maximum ADC speed is used\n"
<< "\n"
@ -474,6 +485,7 @@ Config parse_args(int argc, char** argv) {
}
if (starts_with(arg, "recv_block:")) {
cfg.recv_block_words = parse_u32(arg.substr(11), "recv_block");
cfg.recv_block_specified = true;
continue;
}
if (starts_with(arg, "recv_timeout_ms:")) {
@ -486,6 +498,11 @@ Config parse_args(int argc, char** argv) {
}
if (starts_with(arg, "live_update_period_ms:")) {
cfg.live_update_period_ms = parse_u32(arg.substr(22), "live_update_period_ms");
cfg.live_update_specified = true;
continue;
}
if (starts_with(arg, "svg_history_packets:")) {
cfg.svg_history_packets = parse_u32(arg.substr(20), "svg_history_packets");
continue;
}
if (starts_with(arg, "start_wait_ms:")) {
@ -494,10 +511,12 @@ Config parse_args(int argc, char** argv) {
}
if (starts_with(arg, "buffer_words:")) {
cfg.input_buffer_words = parse_u32(arg.substr(13), "buffer_words");
cfg.input_buffer_specified = true;
continue;
}
if (starts_with(arg, "step_words:")) {
cfg.input_step_words = parse_u32(arg.substr(11), "step_words");
cfg.input_step_specified = true;
continue;
}
if (starts_with(arg, "csv:")) {
@ -528,6 +547,24 @@ Config parse_args(int argc, char** argv) {
if (cfg.max_internal_clock && (cfg.sync_mode != X502_SYNC_INTERNAL)) {
fail("sample_clock_hz:max is only valid together with clock:internal");
}
const bool high_rate_capture =
use_internal_max_clock(cfg) ||
((cfg.sample_clock_hz >= 1000000.0) &&
((cfg.sync_mode == X502_SYNC_INTERNAL) || cfg.sample_clock_specified));
if (high_rate_capture) {
if (!cfg.recv_block_specified) {
cfg.recv_block_words = std::max<uint32_t>(cfg.recv_block_words, 32768U);
}
if (!cfg.input_step_specified) {
cfg.input_step_words = std::max<uint32_t>(cfg.input_step_words, 32768U);
}
if (!cfg.input_buffer_specified) {
cfg.input_buffer_words = std::max<uint32_t>(cfg.input_buffer_words, 8U * 1024U * 1024U);
}
if (!cfg.live_update_specified) {
cfg.live_update_period_ms = std::max<uint32_t>(cfg.live_update_period_ms, 1000U);
}
}
if (cfg.recv_block_words == 0) {
fail("recv_block must be > 0");
}
@ -611,6 +648,7 @@ struct Api {
decltype(&X502_Configure) Configure = nullptr;
decltype(&X502_StreamsEnable) StreamsEnable = nullptr;
decltype(&X502_StreamsStart) StreamsStart = nullptr;
decltype(&X502_GetRecvReadyCount) GetRecvReadyCount = nullptr;
decltype(&X502_Recv) Recv = nullptr;
decltype(&X502_ProcessData) ProcessData = nullptr;
@ -653,6 +691,7 @@ struct Api {
Configure = load_symbol<decltype(Configure)>(x502_module, "X502_Configure");
StreamsEnable = load_symbol<decltype(StreamsEnable)>(x502_module, "X502_StreamsEnable");
StreamsStart = load_symbol<decltype(StreamsStart)>(x502_module, "X502_StreamsStart");
GetRecvReadyCount = load_symbol<decltype(GetRecvReadyCount)>(x502_module, "X502_GetRecvReadyCount");
Recv = load_symbol<decltype(Recv)>(x502_module, "X502_Recv");
ProcessData = load_symbol<decltype(ProcessData)>(x502_module, "X502_ProcessData");
@ -883,6 +922,15 @@ int run(const Config& cfg) {
<< "ADC=" << actual_sample_clock_hz << " Hz, DIN=" << actual_din_freq_hz << " Hz.";
fail(message.str());
}
const double combined_input_rate_hz = actual_sample_clock_hz + actual_din_freq_hz;
if (cfg.ip_addr.has_value() && (combined_input_rate_hz > 2500000.0)) {
std::ostringstream message;
message << "Current Ethernet input load is too high for E-502: ADC "
<< actual_sample_clock_hz << " Hz + DIN " << actual_din_freq_hz
<< " Hz = " << combined_input_rate_hz
<< " words/s, while the documented Ethernet input-only limit is about 2500000 words/s.";
fail(message.str());
}
expect_ok(api, api.SetStreamBufSize(device.hnd, X502_STREAM_CH_IN, cfg.input_buffer_words), "Set input buffer size");
expect_ok(api, api.SetStreamStep(device.hnd, X502_STREAM_CH_IN, cfg.input_step_words), "Set input stream step");
@ -921,6 +969,7 @@ int run(const Config& cfg) {
}
std::cout << "\n"
<< " DIN clock: " << actual_din_freq_hz << " Hz\n"
<< " ADC+DIN total input rate: " << combined_input_rate_hz << " words/s\n"
<< " ADC logical channels: " << cfg.channel_count << "\n"
<< " per-channel frame rate: " << actual_frame_freq_hz << " Hz\n"
<< " duration: " << cfg.duration_ms << " ms\n"
@ -938,8 +987,15 @@ int run(const Config& cfg) {
CaptureFileWriter writer(cfg.csv_path, cfg.svg_path, cfg.live_html_path, cfg.live_json_path);
writer.initialize_live_plot();
writer.initialize_csv(cfg.channel_count);
std::cout << " live plot html: " << writer.live_html_path() << "\n"
<< " live plot data: " << writer.live_json_path() << "\n";
<< " live plot data: " << writer.live_json_path() << "\n"
<< " recv block words: " << cfg.recv_block_words << "\n"
<< " input step words: " << cfg.input_step_words << "\n"
<< " input buffer words: " << cfg.input_buffer_words << "\n"
<< " SVG history packets kept in RAM: "
<< ((cfg.svg_history_packets == 0U) ? std::string("all (unbounded)") : std::to_string(cfg.svg_history_packets))
<< "\n";
ConsoleCtrlGuard console_guard;
if (!console_guard.installed) {
@ -949,17 +1005,16 @@ int run(const Config& cfg) {
expect_ok(api, api.StreamsStart(device.hnd), "Start streams");
device.streams_started = true;
std::vector<uint32_t> raw(cfg.recv_block_words);
std::vector<double> adc_buffer(cfg.recv_block_words);
std::vector<uint32_t> din_buffer(cfg.recv_block_words);
const uint32_t read_capacity_words = std::max(cfg.recv_block_words, cfg.input_step_words);
std::vector<uint32_t> raw(read_capacity_words);
std::vector<double> adc_buffer(read_capacity_words);
std::vector<uint32_t> din_buffer(read_capacity_words);
std::deque<double> pending_adc;
std::deque<uint32_t> pending_din;
std::vector<CapturePacket> packets;
if (cfg.packet_limit != 0U) {
packets.reserve(cfg.packet_limit);
}
std::deque<CapturePacket> packets;
PacketAccumulator current_packet;
current_packet.reset(target_frames, cfg.channel_count);
std::size_t csv_global_frame_index = 0;
bool capture_started = false;
bool stop_loop_requested = false;
@ -1045,7 +1100,7 @@ int run(const Config& cfg) {
}
CapturePacket packet;
packet.packet_index = packets.size() + 1U;
packet.packet_index = static_cast<std::size_t>(total_completed_packets + 1U);
packet.channel_count = cfg.channel_count;
packet.ch1 = std::move(current_packet.channels[0]);
packet.ch2 = std::move(current_packet.channels[1]);
@ -1064,9 +1119,15 @@ int run(const Config& cfg) {
<< ", zeroed_on_DI1_change=" << zeroed_fraction << "% ("
<< current_packet.zeroed_samples << "/" << current_packet.stored_samples << ")\n";
packets.push_back(std::move(packet));
writer.append_csv_packet(packet, actual_frame_freq_hz, csv_global_frame_index);
++total_completed_packets;
++stats_completed_packets;
packets.push_back(std::move(packet));
if ((cfg.svg_history_packets != 0U) && (packets.size() > cfg.svg_history_packets)) {
packets.pop_front();
}
const double elapsed_capture_s =
std::max(1e-9, static_cast<double>(GetTickCount64() - capture_loop_start) / 1000.0);
const double packets_per_second =
@ -1080,7 +1141,7 @@ int run(const Config& cfg) {
((now - last_live_update) >= cfg.live_update_period_ms);
if (should_update_live) {
writer.update_live_plot(packets.back(),
packets.size(),
static_cast<std::size_t>(total_completed_packets),
packets_per_second,
actual_frame_freq_hz,
packet_close_reason_to_string(reason),
@ -1101,7 +1162,7 @@ int run(const Config& cfg) {
};
while (!stop_loop_requested) {
if ((cfg.packet_limit != 0U) && (packets.size() >= cfg.packet_limit)) {
if ((cfg.packet_limit != 0U) && (total_completed_packets >= cfg.packet_limit)) {
stop_loop_requested = true;
break;
}
@ -1113,7 +1174,16 @@ int run(const Config& cfg) {
break;
}
const int32_t recvd = api.Recv(device.hnd, raw.data(), cfg.recv_block_words, cfg.recv_timeout_ms);
uint32_t recv_request_words = cfg.recv_block_words;
uint32_t recv_timeout_ms = cfg.recv_timeout_ms;
uint32_t ready_words = 0;
const int32_t ready_err = api.GetRecvReadyCount(device.hnd, &ready_words);
if ((ready_err == X502_ERR_OK) && (ready_words != 0U)) {
recv_request_words = std::min<uint32_t>(ready_words, read_capacity_words);
recv_timeout_ms = 0;
}
const int32_t recvd = api.Recv(device.hnd, raw.data(), recv_request_words, recv_timeout_ms);
if (recvd < 0) {
fail("X502_Recv failed: " + x502_error(api, recvd));
}
@ -1159,8 +1229,11 @@ int run(const Config& cfg) {
if (process_err == X502_ERR_STREAM_OVERFLOW) {
std::ostringstream message;
message << "Process ADC+DIN data: " << x502_error(api, process_err)
<< ". Try larger buffer_words/step_words, a longer live_update_period_ms, "
<< "or a lower sample_clock_hz / DIN load.";
<< ". Current ADC rate=" << actual_sample_clock_hz
<< " Hz, DIN rate=" << actual_din_freq_hz
<< " Hz, combined input=" << combined_input_rate_hz
<< " words/s. Try larger recv_block/buffer_words/step_words, a longer "
<< "live_update_period_ms, or a lower sample_clock_hz / DIN load.";
fail(message.str());
}
expect_ok(api, process_err, "Process ADC+DIN data");
@ -1237,7 +1310,7 @@ int run(const Config& cfg) {
if (packet_active && stop_edge) {
finish_packet(PacketCloseReason::ExternalStopEdge);
if ((cfg.packet_limit != 0U) && (packets.size() >= cfg.packet_limit)) {
if ((cfg.packet_limit != 0U) && (total_completed_packets >= cfg.packet_limit)) {
stop_loop_requested = true;
}
continue;
@ -1271,7 +1344,7 @@ int run(const Config& cfg) {
if (current_packet.frame_count(cfg.channel_count) >= target_frames) {
finish_packet(PacketCloseReason::DurationLimit);
if ((cfg.packet_limit != 0U) && (packets.size() >= cfg.packet_limit)) {
if ((cfg.packet_limit != 0U) && (total_completed_packets >= cfg.packet_limit)) {
stop_loop_requested = true;
}
}
@ -1298,17 +1371,18 @@ int run(const Config& cfg) {
print_stats(false);
}
writer.write(packets, actual_frame_freq_hz, range_to_volts(cfg.range));
const std::vector<CapturePacket> svg_packets(packets.begin(), packets.end());
writer.write(svg_packets, actual_frame_freq_hz, range_to_volts(cfg.range));
std::size_t total_packet_frames = 0;
for (const auto& packet : packets) {
for (const auto& packet : svg_packets) {
total_packet_frames += (packet.channel_count <= 1U)
? packet.ch1.size()
: std::min(packet.ch1.size(), packet.ch2.size());
}
std::cout << "Captured " << packets.size() << " packet(s), "
<< total_packet_frames << " total frames per channel\n"
std::cout << "Captured " << total_completed_packets << " packet(s), "
<< total_packet_frames << " total frames per channel kept for final SVG\n"
<< "ADC samples forced to 0 on DI1 change: " << total_zeroed_samples << "\n"
<< "Average stats: "
<< "MB/s=" << std::fixed << std::setprecision(3)
@ -1333,6 +1407,7 @@ int run(const Config& cfg) {
? 0.0
: (100.0 * static_cast<double>(total_zeroed_samples) / static_cast<double>(total_stored_adc_samples)))
<< "% (" << total_zeroed_samples << "/" << total_stored_adc_samples << ")\n"
<< "Final SVG packets retained in memory: " << svg_packets.size() << "\n"
<< "CSV: " << cfg.csv_path << "\n"
<< "SVG: " << cfg.svg_path << "\n";
@ -1345,6 +1420,11 @@ int main(int argc, char** argv) {
try {
const Config cfg = parse_args(argc, argv);
return run(cfg);
} catch (const std::bad_alloc&) {
std::cerr << "Error: bad allocation. The process ran out of RAM. "
"Try a smaller duration_ms, fewer logical channels, or a lower svg_history_packets "
"(for example svg_history_packets:10 or svg_history_packets:0 only if you really need all packets in SVG).\n";
return 1;
} catch (const std::exception& ex) {
std::cerr << "Error: " << ex.what() << "\n";
return 1;