new cyclic writer
This commit is contained in:
166
main.cpp
166
main.cpp
@ -412,7 +412,7 @@ void print_help(const char* exe_name) {
|
||||
<< " buffer_words:8388608 -> input stream buffer size in 32-bit words\n"
|
||||
<< " step_words:32768 -> input stream transfer step in 32-bit words\n"
|
||||
<< " live_html/live_json -> live graph files updated as packets arrive\n"
|
||||
<< " tty:/tmp/ttyADC_data -> write binary step frames to a Linux/POSIX tty or PTY link path\n"
|
||||
<< " tty:/tmp/ttyADC_data -> write a continuous legacy 4-word CH1/CH2 stream to a Linux/POSIX tty or PTY link path\n"
|
||||
<< " If sample_clock_hz is omitted together with clock:internal, the maximum ADC speed is used\n"
|
||||
<< "\n"
|
||||
<< "Differential physical channel mapping:\n"
|
||||
@ -990,38 +990,10 @@ struct PacketAccumulator {
|
||||
}
|
||||
};
|
||||
|
||||
struct TtyStepAccumulator {
|
||||
double sum_ch1 = 0.0;
|
||||
double sum_ch2 = 0.0;
|
||||
uint32_t count_ch1 = 0;
|
||||
uint32_t count_ch2 = 0;
|
||||
uint32_t next_step_index = 1;
|
||||
|
||||
void clear_step() {
|
||||
sum_ch1 = 0.0;
|
||||
sum_ch2 = 0.0;
|
||||
count_ch1 = 0;
|
||||
count_ch2 = 0;
|
||||
}
|
||||
|
||||
void reset_packet() {
|
||||
clear_step();
|
||||
next_step_index = 1;
|
||||
}
|
||||
|
||||
void add_sample(uint32_t lch, double raw_code) {
|
||||
if (lch == 0U) {
|
||||
sum_ch1 += raw_code;
|
||||
++count_ch1;
|
||||
} else if (lch == 1U) {
|
||||
sum_ch2 += raw_code;
|
||||
++count_ch2;
|
||||
}
|
||||
}
|
||||
|
||||
bool has_complete_step() const {
|
||||
return (count_ch1 != 0U) && (count_ch2 != 0U);
|
||||
}
|
||||
struct TtyContinuousState {
|
||||
bool pending_ch1_valid = false;
|
||||
int16_t pending_ch1 = 0;
|
||||
uint16_t next_index = 1;
|
||||
};
|
||||
|
||||
int16_t pack_raw_code_to_int16(double avg_raw_code) {
|
||||
@ -1278,16 +1250,29 @@ int run(const Config& cfg) {
|
||||
<< " ADC range: +/-" << range_to_volts(cfg.range) << " V\n"
|
||||
<< " max frames per packet per channel: " << target_frames << "\n";
|
||||
|
||||
const uint32_t read_capacity_words = std::max(cfg.recv_block_words, cfg.input_step_words);
|
||||
std::size_t tty_ring_capacity_bytes = 0;
|
||||
std::unique_ptr<TtyProtocolWriter> tty_writer;
|
||||
if (cfg.tty_path) {
|
||||
tty_writer = std::make_unique<TtyProtocolWriter>(*cfg.tty_path);
|
||||
const uint64_t typical_tty_batch_frames = (static_cast<uint64_t>(read_capacity_words) + 1U) / 2U;
|
||||
const uint64_t typical_tty_batch_bytes = typical_tty_batch_frames * sizeof(uint16_t) * 4U;
|
||||
const uint64_t required_tty_ring_bytes = typical_tty_batch_bytes * 16U;
|
||||
const uint64_t tty_ring_bytes_u64 = std::max<uint64_t>(1024U * 1024U, required_tty_ring_bytes);
|
||||
if (tty_ring_bytes_u64 > static_cast<uint64_t>(std::numeric_limits<std::size_t>::max())) {
|
||||
fail("TTY ring buffer size overflowed size_t");
|
||||
}
|
||||
tty_ring_capacity_bytes = static_cast<std::size_t>(tty_ring_bytes_u64);
|
||||
tty_writer = std::make_unique<TtyProtocolWriter>(*cfg.tty_path, tty_ring_capacity_bytes);
|
||||
tty_writer->emit_packet_start();
|
||||
}
|
||||
CaptureFileWriter writer(cfg.csv_path, cfg.svg_path, cfg.live_html_path, cfg.live_json_path);
|
||||
writer.initialize_live_plot();
|
||||
writer.initialize_csv(cfg.channel_count, cfg.di1_mode == Di1Mode::Trace);
|
||||
std::cout << " live plot html: " << writer.live_html_path() << "\n"
|
||||
<< " live plot data: " << writer.live_json_path() << "\n"
|
||||
<< " tty step output: " << (cfg.tty_path ? *cfg.tty_path : std::string("disabled")) << "\n"
|
||||
<< " tty stream output: " << (cfg.tty_path ? *cfg.tty_path : std::string("disabled")) << "\n"
|
||||
<< " tty ring buffer bytes: "
|
||||
<< (cfg.tty_path ? std::to_string(tty_ring_capacity_bytes) : std::string("disabled")) << "\n"
|
||||
<< " recv block words: " << cfg.recv_block_words << "\n"
|
||||
<< " input step words: " << cfg.input_step_words << "\n"
|
||||
<< " input buffer words: " << cfg.input_buffer_words << "\n"
|
||||
@ -1303,17 +1288,17 @@ int run(const Config& cfg) {
|
||||
expect_ok(api, api.StreamsStart(device.hnd), "Start streams");
|
||||
device.streams_started = true;
|
||||
|
||||
const uint32_t read_capacity_words = std::max(cfg.recv_block_words, cfg.input_step_words);
|
||||
std::vector<uint32_t> raw(read_capacity_words);
|
||||
std::vector<double> adc_buffer(read_capacity_words);
|
||||
std::vector<double> adc_raw_buffer(read_capacity_words);
|
||||
std::vector<uint32_t> din_buffer(read_capacity_words);
|
||||
std::deque<double> pending_adc;
|
||||
std::deque<double> pending_adc_raw;
|
||||
std::deque<uint32_t> pending_din;
|
||||
std::deque<CapturePacket> packets;
|
||||
PacketAccumulator current_packet;
|
||||
TtyStepAccumulator tty_step;
|
||||
TtyContinuousState tty_state;
|
||||
std::vector<uint16_t> tty_frame_words;
|
||||
tty_frame_words.reserve(static_cast<std::size_t>(read_capacity_words) * 2U);
|
||||
current_packet.reset(target_frames, cfg.channel_count);
|
||||
std::size_t csv_global_frame_index = 0;
|
||||
|
||||
@ -1346,6 +1331,8 @@ int run(const Config& cfg) {
|
||||
uint64_t stats_zeroed_samples = 0;
|
||||
uint64_t stats_completed_frames = 0;
|
||||
uint64_t stats_completed_packets = 0;
|
||||
TtyProtocolWriter::StatsSnapshot tty_stats_window_start {};
|
||||
bool tty_overflow_warning_printed = false;
|
||||
|
||||
auto print_stats = [&](bool final_report) {
|
||||
const TickMs now = tick_count_ms();
|
||||
@ -1358,6 +1345,11 @@ int run(const Config& cfg) {
|
||||
const double zeroed_fraction = (stats_stored_adc_samples == 0U)
|
||||
? 0.0
|
||||
: (100.0 * static_cast<double>(stats_zeroed_samples) / static_cast<double>(stats_stored_adc_samples));
|
||||
const TtyProtocolWriter::StatsSnapshot tty_stats_now =
|
||||
tty_writer ? tty_writer->stats() : TtyProtocolWriter::StatsSnapshot {};
|
||||
const uint64_t tty_frames_written = tty_stats_now.frames_written - tty_stats_window_start.frames_written;
|
||||
const uint64_t tty_frames_dropped = tty_stats_now.frames_dropped - tty_stats_window_start.frames_dropped;
|
||||
const uint64_t tty_ring_overflows = tty_stats_now.ring_overflows - tty_stats_window_start.ring_overflows;
|
||||
|
||||
std::cout << std::fixed << std::setprecision(3)
|
||||
<< (final_report ? "Final stats: " : "Online stats: ")
|
||||
@ -1372,6 +1364,11 @@ int run(const Config& cfg) {
|
||||
} else if (cfg.di1_mode == Di1Mode::Trace) {
|
||||
std::cout << ", DI1 trace=enabled";
|
||||
}
|
||||
if (tty_writer) {
|
||||
std::cout << ", tty_frames_written=" << tty_frames_written
|
||||
<< ", tty_frames_dropped=" << tty_frames_dropped
|
||||
<< ", tty_ring_overflows=" << tty_ring_overflows;
|
||||
}
|
||||
std::cout << "\n";
|
||||
|
||||
if (!final_report) {
|
||||
@ -1384,6 +1381,7 @@ int run(const Config& cfg) {
|
||||
stats_zeroed_samples = 0;
|
||||
stats_completed_frames = 0;
|
||||
stats_completed_packets = 0;
|
||||
tty_stats_window_start = tty_stats_now;
|
||||
}
|
||||
};
|
||||
|
||||
@ -1392,31 +1390,9 @@ int run(const Config& cfg) {
|
||||
return;
|
||||
}
|
||||
current_packet.reset(target_frames, cfg.channel_count);
|
||||
if (tty_writer) {
|
||||
tty_step.reset_packet();
|
||||
tty_writer->emit_packet_start();
|
||||
}
|
||||
packet_active = true;
|
||||
};
|
||||
|
||||
auto emit_tty_step = [&]() {
|
||||
if (!tty_writer || !tty_step.has_complete_step()) {
|
||||
return;
|
||||
}
|
||||
if (tty_step.next_step_index >= 0xFFFFU) {
|
||||
std::cerr << "Warning: TTY protocol step index overflow, forcing tty packet restart\n";
|
||||
tty_step.reset_packet();
|
||||
tty_writer->emit_packet_start();
|
||||
}
|
||||
|
||||
const double ch1_avg = tty_step.sum_ch1 / static_cast<double>(tty_step.count_ch1);
|
||||
const double ch2_avg = tty_step.sum_ch2 / static_cast<double>(tty_step.count_ch2);
|
||||
tty_writer->emit_step(static_cast<uint16_t>(tty_step.next_step_index),
|
||||
pack_raw_code_to_int16(ch1_avg),
|
||||
pack_raw_code_to_int16(ch2_avg));
|
||||
++tty_step.next_step_index;
|
||||
};
|
||||
|
||||
auto finish_packet = [&](PacketCloseReason reason) {
|
||||
const std::size_t frames = current_packet.frame_count(cfg.channel_count);
|
||||
if (frames != 0U) {
|
||||
@ -1499,10 +1475,12 @@ int run(const Config& cfg) {
|
||||
|
||||
packet_active = false;
|
||||
current_packet.reset(target_frames, cfg.channel_count);
|
||||
tty_step.clear_step();
|
||||
};
|
||||
|
||||
while (!stop_loop_requested) {
|
||||
if (tty_writer) {
|
||||
tty_writer->throw_if_failed();
|
||||
}
|
||||
if ((cfg.packet_limit != 0U) && (total_completed_packets >= cfg.packet_limit)) {
|
||||
stop_loop_requested = true;
|
||||
break;
|
||||
@ -1594,6 +1572,39 @@ int run(const Config& cfg) {
|
||||
if (raw_adc_count != adc_count) {
|
||||
fail("Raw ADC parsing returned a different sample count than voltage processing");
|
||||
}
|
||||
|
||||
tty_frame_words.clear();
|
||||
tty_frame_words.reserve(((static_cast<std::size_t>(raw_adc_count) + 1U) / 2U) * 4U);
|
||||
for (uint32_t i = 0; i < raw_adc_count; ++i) {
|
||||
const int16_t sample = pack_raw_code_to_int16(adc_raw_buffer[i]);
|
||||
if (!tty_state.pending_ch1_valid) {
|
||||
tty_state.pending_ch1 = sample;
|
||||
tty_state.pending_ch1_valid = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
tty_frame_words.push_back(0x000A);
|
||||
tty_frame_words.push_back(tty_state.next_index);
|
||||
tty_frame_words.push_back(static_cast<uint16_t>(tty_state.pending_ch1));
|
||||
tty_frame_words.push_back(static_cast<uint16_t>(sample));
|
||||
tty_state.pending_ch1_valid = false;
|
||||
|
||||
if (tty_state.next_index >= 0xFFFEU) {
|
||||
tty_state.next_index = 1U;
|
||||
} else {
|
||||
++tty_state.next_index;
|
||||
}
|
||||
}
|
||||
|
||||
if (!tty_frame_words.empty()) {
|
||||
tty_writer->enqueue_encoded_frames(tty_frame_words.data(), tty_frame_words.size() / 4U);
|
||||
|
||||
const auto tty_stats = tty_writer->stats();
|
||||
if (!tty_overflow_warning_printed && (tty_stats.ring_overflows != 0U)) {
|
||||
std::cerr << "Warning: TTY ring buffer overflowed; dropping oldest frames to keep the stream continuous\n";
|
||||
tty_overflow_warning_printed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ((adc_count == 0U) && (din_count == 0U) && (!tty_writer || (raw_adc_count == 0U))) {
|
||||
@ -1608,31 +1619,20 @@ int run(const Config& cfg) {
|
||||
for (uint32_t i = 0; i < adc_count; ++i) {
|
||||
pending_adc.push_back(adc_buffer[i]);
|
||||
}
|
||||
if (tty_writer) {
|
||||
for (uint32_t i = 0; i < raw_adc_count; ++i) {
|
||||
pending_adc_raw.push_back(adc_raw_buffer[i]);
|
||||
}
|
||||
}
|
||||
for (uint32_t i = 0; i < din_count; ++i) {
|
||||
pending_din.push_back(din_buffer[i]);
|
||||
}
|
||||
|
||||
if ((pending_adc.size() > 1000000U) ||
|
||||
(pending_din.size() > 1000000U) ||
|
||||
(tty_writer && (pending_adc_raw.size() > 1000000U))) {
|
||||
(pending_din.size() > 1000000U)) {
|
||||
fail("Internal backlog grew too large while aligning ADC and DIN samples");
|
||||
}
|
||||
|
||||
while (!pending_adc.empty() &&
|
||||
!pending_din.empty() &&
|
||||
(!tty_writer || !pending_adc_raw.empty()) &&
|
||||
!stop_loop_requested) {
|
||||
const double adc_value = pending_adc.front();
|
||||
pending_adc.pop_front();
|
||||
const double adc_raw_value = tty_writer ? pending_adc_raw.front() : 0.0;
|
||||
if (tty_writer) {
|
||||
pending_adc_raw.pop_front();
|
||||
}
|
||||
|
||||
const uint32_t din_value = pending_din.front();
|
||||
pending_din.pop_front();
|
||||
@ -1697,11 +1697,6 @@ int run(const Config& cfg) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tty_writer && di1_changed) {
|
||||
emit_tty_step();
|
||||
tty_step.clear_step();
|
||||
}
|
||||
|
||||
const uint32_t lch = next_lch;
|
||||
next_lch = (next_lch + 1U) % cfg.channel_count;
|
||||
|
||||
@ -1720,9 +1715,6 @@ int run(const Config& cfg) {
|
||||
|
||||
if (current_packet.channels[lch].size() < target_frames) {
|
||||
current_packet.channels[lch].push_back(stored_value);
|
||||
if (tty_writer) {
|
||||
tty_step.add_sample(lch, adc_raw_value);
|
||||
}
|
||||
++current_packet.stored_samples;
|
||||
++total_stored_adc_samples;
|
||||
++stats_stored_adc_samples;
|
||||
@ -1767,6 +1759,13 @@ int run(const Config& cfg) {
|
||||
print_stats(false);
|
||||
}
|
||||
|
||||
TtyProtocolWriter::StatsSnapshot tty_final_stats {};
|
||||
if (tty_writer) {
|
||||
tty_writer->shutdown();
|
||||
tty_writer->throw_if_failed();
|
||||
tty_final_stats = tty_writer->stats();
|
||||
}
|
||||
|
||||
const std::vector<CapturePacket> svg_packets(packets.begin(), packets.end());
|
||||
writer.write(svg_packets, actual_frame_freq_hz, range_to_volts(cfg.range));
|
||||
|
||||
@ -1811,6 +1810,11 @@ int run(const Config& cfg) {
|
||||
} else if (cfg.di1_mode == Di1Mode::Trace) {
|
||||
std::cout << ", DI1 trace=enabled";
|
||||
}
|
||||
if (tty_writer) {
|
||||
std::cout << ", tty_frames_written=" << tty_final_stats.frames_written
|
||||
<< ", tty_frames_dropped=" << tty_final_stats.frames_dropped
|
||||
<< ", tty_ring_overflows=" << tty_final_stats.ring_overflows;
|
||||
}
|
||||
std::cout << "\n"
|
||||
<< "Final SVG packets retained in memory: " << svg_packets.size() << "\n"
|
||||
<< "CSV: " << cfg.csv_path << "\n"
|
||||
|
||||
Reference in New Issue
Block a user