From af462ab46aa4925ddb9810b1c7d07a6fec349a1d Mon Sep 17 00:00:00 2001 From: awe Date: Thu, 9 Apr 2026 18:27:59 +0300 Subject: [PATCH] new cyclic writer --- main.cpp | 166 ++++++++-------- tty_protocol_writer.cpp | 418 ++++++++++++++++++++++++++++------------ tty_protocol_writer.h | 37 ++-- 3 files changed, 402 insertions(+), 219 deletions(-) diff --git a/main.cpp b/main.cpp index 30e3e87..e9d1b97 100644 --- a/main.cpp +++ b/main.cpp @@ -412,7 +412,7 @@ void print_help(const char* exe_name) { << " buffer_words:8388608 -> input stream buffer size in 32-bit words\n" << " step_words:32768 -> input stream transfer step in 32-bit words\n" << " live_html/live_json -> live graph files updated as packets arrive\n" - << " tty:/tmp/ttyADC_data -> write binary step frames to a Linux/POSIX tty or PTY link path\n" + << " tty:/tmp/ttyADC_data -> write a continuous legacy 4-word CH1/CH2 stream to a Linux/POSIX tty or PTY link path\n" << " If sample_clock_hz is omitted together with clock:internal, the maximum ADC speed is used\n" << "\n" << "Differential physical channel mapping:\n" @@ -990,38 +990,10 @@ struct PacketAccumulator { } }; -struct TtyStepAccumulator { - double sum_ch1 = 0.0; - double sum_ch2 = 0.0; - uint32_t count_ch1 = 0; - uint32_t count_ch2 = 0; - uint32_t next_step_index = 1; - - void clear_step() { - sum_ch1 = 0.0; - sum_ch2 = 0.0; - count_ch1 = 0; - count_ch2 = 0; - } - - void reset_packet() { - clear_step(); - next_step_index = 1; - } - - void add_sample(uint32_t lch, double raw_code) { - if (lch == 0U) { - sum_ch1 += raw_code; - ++count_ch1; - } else if (lch == 1U) { - sum_ch2 += raw_code; - ++count_ch2; - } - } - - bool has_complete_step() const { - return (count_ch1 != 0U) && (count_ch2 != 0U); - } +struct TtyContinuousState { + bool pending_ch1_valid = false; + int16_t pending_ch1 = 0; + uint16_t next_index = 1; }; int16_t pack_raw_code_to_int16(double avg_raw_code) { @@ -1278,16 +1250,29 @@ int run(const Config& cfg) { << " ADC range: +/-" << range_to_volts(cfg.range) << " V\n" << " max frames per packet per channel: " << target_frames << "\n"; + const uint32_t read_capacity_words = std::max(cfg.recv_block_words, cfg.input_step_words); + std::size_t tty_ring_capacity_bytes = 0; std::unique_ptr tty_writer; if (cfg.tty_path) { - tty_writer = std::make_unique(*cfg.tty_path); + const uint64_t typical_tty_batch_frames = (static_cast(read_capacity_words) + 1U) / 2U; + const uint64_t typical_tty_batch_bytes = typical_tty_batch_frames * sizeof(uint16_t) * 4U; + const uint64_t required_tty_ring_bytes = typical_tty_batch_bytes * 16U; + const uint64_t tty_ring_bytes_u64 = std::max(1024U * 1024U, required_tty_ring_bytes); + if (tty_ring_bytes_u64 > static_cast(std::numeric_limits::max())) { + fail("TTY ring buffer size overflowed size_t"); + } + tty_ring_capacity_bytes = static_cast(tty_ring_bytes_u64); + tty_writer = std::make_unique(*cfg.tty_path, tty_ring_capacity_bytes); + tty_writer->emit_packet_start(); } CaptureFileWriter writer(cfg.csv_path, cfg.svg_path, cfg.live_html_path, cfg.live_json_path); writer.initialize_live_plot(); writer.initialize_csv(cfg.channel_count, cfg.di1_mode == Di1Mode::Trace); std::cout << " live plot html: " << writer.live_html_path() << "\n" << " live plot data: " << writer.live_json_path() << "\n" - << " tty step output: " << (cfg.tty_path ? *cfg.tty_path : std::string("disabled")) << "\n" + << " tty stream output: " << (cfg.tty_path ? *cfg.tty_path : std::string("disabled")) << "\n" + << " tty ring buffer bytes: " + << (cfg.tty_path ? std::to_string(tty_ring_capacity_bytes) : std::string("disabled")) << "\n" << " recv block words: " << cfg.recv_block_words << "\n" << " input step words: " << cfg.input_step_words << "\n" << " input buffer words: " << cfg.input_buffer_words << "\n" @@ -1303,17 +1288,17 @@ int run(const Config& cfg) { expect_ok(api, api.StreamsStart(device.hnd), "Start streams"); device.streams_started = true; - const uint32_t read_capacity_words = std::max(cfg.recv_block_words, cfg.input_step_words); std::vector raw(read_capacity_words); std::vector adc_buffer(read_capacity_words); std::vector adc_raw_buffer(read_capacity_words); std::vector din_buffer(read_capacity_words); std::deque pending_adc; - std::deque pending_adc_raw; std::deque pending_din; std::deque packets; PacketAccumulator current_packet; - TtyStepAccumulator tty_step; + TtyContinuousState tty_state; + std::vector tty_frame_words; + tty_frame_words.reserve(static_cast(read_capacity_words) * 2U); current_packet.reset(target_frames, cfg.channel_count); std::size_t csv_global_frame_index = 0; @@ -1346,6 +1331,8 @@ int run(const Config& cfg) { uint64_t stats_zeroed_samples = 0; uint64_t stats_completed_frames = 0; uint64_t stats_completed_packets = 0; + TtyProtocolWriter::StatsSnapshot tty_stats_window_start {}; + bool tty_overflow_warning_printed = false; auto print_stats = [&](bool final_report) { const TickMs now = tick_count_ms(); @@ -1358,6 +1345,11 @@ int run(const Config& cfg) { const double zeroed_fraction = (stats_stored_adc_samples == 0U) ? 0.0 : (100.0 * static_cast(stats_zeroed_samples) / static_cast(stats_stored_adc_samples)); + const TtyProtocolWriter::StatsSnapshot tty_stats_now = + tty_writer ? tty_writer->stats() : TtyProtocolWriter::StatsSnapshot {}; + const uint64_t tty_frames_written = tty_stats_now.frames_written - tty_stats_window_start.frames_written; + const uint64_t tty_frames_dropped = tty_stats_now.frames_dropped - tty_stats_window_start.frames_dropped; + const uint64_t tty_ring_overflows = tty_stats_now.ring_overflows - tty_stats_window_start.ring_overflows; std::cout << std::fixed << std::setprecision(3) << (final_report ? "Final stats: " : "Online stats: ") @@ -1372,6 +1364,11 @@ int run(const Config& cfg) { } else if (cfg.di1_mode == Di1Mode::Trace) { std::cout << ", DI1 trace=enabled"; } + if (tty_writer) { + std::cout << ", tty_frames_written=" << tty_frames_written + << ", tty_frames_dropped=" << tty_frames_dropped + << ", tty_ring_overflows=" << tty_ring_overflows; + } std::cout << "\n"; if (!final_report) { @@ -1384,6 +1381,7 @@ int run(const Config& cfg) { stats_zeroed_samples = 0; stats_completed_frames = 0; stats_completed_packets = 0; + tty_stats_window_start = tty_stats_now; } }; @@ -1392,31 +1390,9 @@ int run(const Config& cfg) { return; } current_packet.reset(target_frames, cfg.channel_count); - if (tty_writer) { - tty_step.reset_packet(); - tty_writer->emit_packet_start(); - } packet_active = true; }; - auto emit_tty_step = [&]() { - if (!tty_writer || !tty_step.has_complete_step()) { - return; - } - if (tty_step.next_step_index >= 0xFFFFU) { - std::cerr << "Warning: TTY protocol step index overflow, forcing tty packet restart\n"; - tty_step.reset_packet(); - tty_writer->emit_packet_start(); - } - - const double ch1_avg = tty_step.sum_ch1 / static_cast(tty_step.count_ch1); - const double ch2_avg = tty_step.sum_ch2 / static_cast(tty_step.count_ch2); - tty_writer->emit_step(static_cast(tty_step.next_step_index), - pack_raw_code_to_int16(ch1_avg), - pack_raw_code_to_int16(ch2_avg)); - ++tty_step.next_step_index; - }; - auto finish_packet = [&](PacketCloseReason reason) { const std::size_t frames = current_packet.frame_count(cfg.channel_count); if (frames != 0U) { @@ -1499,10 +1475,12 @@ int run(const Config& cfg) { packet_active = false; current_packet.reset(target_frames, cfg.channel_count); - tty_step.clear_step(); }; while (!stop_loop_requested) { + if (tty_writer) { + tty_writer->throw_if_failed(); + } if ((cfg.packet_limit != 0U) && (total_completed_packets >= cfg.packet_limit)) { stop_loop_requested = true; break; @@ -1594,6 +1572,39 @@ int run(const Config& cfg) { if (raw_adc_count != adc_count) { fail("Raw ADC parsing returned a different sample count than voltage processing"); } + + tty_frame_words.clear(); + tty_frame_words.reserve(((static_cast(raw_adc_count) + 1U) / 2U) * 4U); + for (uint32_t i = 0; i < raw_adc_count; ++i) { + const int16_t sample = pack_raw_code_to_int16(adc_raw_buffer[i]); + if (!tty_state.pending_ch1_valid) { + tty_state.pending_ch1 = sample; + tty_state.pending_ch1_valid = true; + continue; + } + + tty_frame_words.push_back(0x000A); + tty_frame_words.push_back(tty_state.next_index); + tty_frame_words.push_back(static_cast(tty_state.pending_ch1)); + tty_frame_words.push_back(static_cast(sample)); + tty_state.pending_ch1_valid = false; + + if (tty_state.next_index >= 0xFFFEU) { + tty_state.next_index = 1U; + } else { + ++tty_state.next_index; + } + } + + if (!tty_frame_words.empty()) { + tty_writer->enqueue_encoded_frames(tty_frame_words.data(), tty_frame_words.size() / 4U); + + const auto tty_stats = tty_writer->stats(); + if (!tty_overflow_warning_printed && (tty_stats.ring_overflows != 0U)) { + std::cerr << "Warning: TTY ring buffer overflowed; dropping oldest frames to keep the stream continuous\n"; + tty_overflow_warning_printed = true; + } + } } if ((adc_count == 0U) && (din_count == 0U) && (!tty_writer || (raw_adc_count == 0U))) { @@ -1608,31 +1619,20 @@ int run(const Config& cfg) { for (uint32_t i = 0; i < adc_count; ++i) { pending_adc.push_back(adc_buffer[i]); } - if (tty_writer) { - for (uint32_t i = 0; i < raw_adc_count; ++i) { - pending_adc_raw.push_back(adc_raw_buffer[i]); - } - } for (uint32_t i = 0; i < din_count; ++i) { pending_din.push_back(din_buffer[i]); } if ((pending_adc.size() > 1000000U) || - (pending_din.size() > 1000000U) || - (tty_writer && (pending_adc_raw.size() > 1000000U))) { + (pending_din.size() > 1000000U)) { fail("Internal backlog grew too large while aligning ADC and DIN samples"); } while (!pending_adc.empty() && !pending_din.empty() && - (!tty_writer || !pending_adc_raw.empty()) && !stop_loop_requested) { const double adc_value = pending_adc.front(); pending_adc.pop_front(); - const double adc_raw_value = tty_writer ? pending_adc_raw.front() : 0.0; - if (tty_writer) { - pending_adc_raw.pop_front(); - } const uint32_t din_value = pending_din.front(); pending_din.pop_front(); @@ -1697,11 +1697,6 @@ int run(const Config& cfg) { continue; } - if (tty_writer && di1_changed) { - emit_tty_step(); - tty_step.clear_step(); - } - const uint32_t lch = next_lch; next_lch = (next_lch + 1U) % cfg.channel_count; @@ -1720,9 +1715,6 @@ int run(const Config& cfg) { if (current_packet.channels[lch].size() < target_frames) { current_packet.channels[lch].push_back(stored_value); - if (tty_writer) { - tty_step.add_sample(lch, adc_raw_value); - } ++current_packet.stored_samples; ++total_stored_adc_samples; ++stats_stored_adc_samples; @@ -1767,6 +1759,13 @@ int run(const Config& cfg) { print_stats(false); } + TtyProtocolWriter::StatsSnapshot tty_final_stats {}; + if (tty_writer) { + tty_writer->shutdown(); + tty_writer->throw_if_failed(); + tty_final_stats = tty_writer->stats(); + } + const std::vector svg_packets(packets.begin(), packets.end()); writer.write(svg_packets, actual_frame_freq_hz, range_to_volts(cfg.range)); @@ -1811,6 +1810,11 @@ int run(const Config& cfg) { } else if (cfg.di1_mode == Di1Mode::Trace) { std::cout << ", DI1 trace=enabled"; } + if (tty_writer) { + std::cout << ", tty_frames_written=" << tty_final_stats.frames_written + << ", tty_frames_dropped=" << tty_final_stats.frames_dropped + << ", tty_ring_overflows=" << tty_final_stats.ring_overflows; + } std::cout << "\n" << "Final SVG packets retained in memory: " << svg_packets.size() << "\n" << "CSV: " << cfg.csv_path << "\n" diff --git a/tty_protocol_writer.cpp b/tty_protocol_writer.cpp index cfb464f..405f319 100644 --- a/tty_protocol_writer.cpp +++ b/tty_protocol_writer.cpp @@ -1,48 +1,68 @@ #include "tty_protocol_writer.h" +#include +#include +#include +#include +#include #include +#include #include +#include #ifdef _WIN32 -TtyProtocolWriter::TtyProtocolWriter(std::string path) : path_(std::move(path)) { +struct TtyProtocolWriter::Impl {}; + +TtyProtocolWriter::TtyProtocolWriter(std::string path, std::size_t ring_capacity_bytes) + : path_(std::move(path)) { + (void) ring_capacity_bytes; throw std::runtime_error("tty output is supported only on Linux/POSIX"); } TtyProtocolWriter::~TtyProtocolWriter() = default; -TtyProtocolWriter::TtyProtocolWriter(TtyProtocolWriter&& other) noexcept = default; +void TtyProtocolWriter::emit_packet_start() {} -TtyProtocolWriter& TtyProtocolWriter::operator=(TtyProtocolWriter&& other) noexcept = default; - -void TtyProtocolWriter::emit_packet_start() const {} - -void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) const { +void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) { (void) index; (void) ch1_avg; (void) ch2_avg; } +void TtyProtocolWriter::enqueue_encoded_frames(const uint16_t* words, std::size_t frame_count) { + (void) words; + (void) frame_count; +} + +TtyProtocolWriter::StatsSnapshot TtyProtocolWriter::stats() const { + return {}; +} + const std::string& TtyProtocolWriter::path() const { return path_; } -void TtyProtocolWriter::write_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) const { +void TtyProtocolWriter::throw_if_failed() const {} + +void TtyProtocolWriter::shutdown() {} + +void TtyProtocolWriter::enqueue_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { (void) word0; (void) word1; (void) word2; (void) word3; } -void TtyProtocolWriter::close_fd() noexcept {} +void TtyProtocolWriter::worker_loop() {} #else -#include #include -#include +#include #include #include +#include #include #include #include @@ -53,12 +73,24 @@ void TtyProtocolWriter::close_fd() noexcept {} namespace { +constexpr std::size_t kFrameWordCount = 4U; +constexpr std::size_t kFrameByteCount = kFrameWordCount * sizeof(uint16_t); + +using EncodedFrame = std::array; + std::string io_error(const std::string& action, const std::string& path) { std::ostringstream out; out << action << " '" << path << "': " << std::strerror(errno); return out.str(); } +void close_fd_if_open(int& fd) noexcept { + if (fd >= 0) { + ::close(fd); + fd = -1; + } +} + void set_fd_raw(int fd) { struct termios tio {}; if (::tcgetattr(fd, &tio) != 0) { @@ -94,146 +126,286 @@ std::optional read_link_target(const std::string& path) { return std::string(buf.data()); } +EncodedFrame encode_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { + const uint16_t words[kFrameWordCount] = {word0, word1, word2, word3}; + EncodedFrame frame {}; + std::memcpy(frame.data(), words, sizeof(words)); + return frame; +} + } // namespace -TtyProtocolWriter::TtyProtocolWriter(std::string path) : path_(std::move(path)) { +struct TtyProtocolWriter::Impl { + explicit Impl(std::size_t ring_capacity_bytes) + : capacity_frames(std::max(1U, ring_capacity_bytes / kFrameByteCount)), + ring(capacity_frames) {} + + int fd = -1; + int slave_fd = -1; + std::string slave_path; + bool owns_link = false; + + const std::size_t capacity_frames; + std::vector ring; + std::size_t head = 0; + std::size_t size = 0; + + mutable std::mutex mutex; + std::condition_variable data_ready_cv; + std::thread worker; + bool stop_requested = false; + std::exception_ptr failure; + StatsSnapshot stats; +}; + +TtyProtocolWriter::TtyProtocolWriter(std::string path, std::size_t ring_capacity_bytes) + : path_(std::move(path)), + impl_(std::make_unique(ring_capacity_bytes)) { if (is_character_device_path(path_)) { - fd_ = ::open(path_.c_str(), O_WRONLY | O_NOCTTY); - if (fd_ < 0) { + impl_->fd = ::open(path_.c_str(), O_WRONLY | O_NOCTTY); + if (impl_->fd < 0) { throw std::runtime_error(io_error("Cannot open tty output", path_)); } - return; - } - - std::array slave_name {}; - if (::openpty(&fd_, &slave_fd_, slave_name.data(), nullptr, nullptr) != 0) { - throw std::runtime_error(io_error("Cannot create PTY bridge for", path_)); - } - try { - slave_path_ = slave_name.data(); - set_fd_raw(slave_fd_); - - struct stat st {}; - if (::lstat(path_.c_str(), &st) == 0) { - if (!S_ISLNK(st.st_mode) && !S_ISREG(st.st_mode)) { - throw std::runtime_error("Refusing to replace non-link path '" + path_ + "'"); - } - if (::unlink(path_.c_str()) != 0) { - throw std::runtime_error(io_error("Cannot remove existing tty link", path_)); - } - } else if (errno != ENOENT) { - throw std::runtime_error(io_error("Cannot inspect tty link path", path_)); + } else { + std::array slave_name {}; + if (::openpty(&impl_->fd, &impl_->slave_fd, slave_name.data(), nullptr, nullptr) != 0) { + throw std::runtime_error(io_error("Cannot create PTY bridge for", path_)); } - if (::symlink(slave_path_.c_str(), path_.c_str()) != 0) { - throw std::runtime_error(io_error("Cannot create tty symlink", path_)); + try { + impl_->slave_path = slave_name.data(); + set_fd_raw(impl_->slave_fd); + + struct stat st {}; + if (::lstat(path_.c_str(), &st) == 0) { + if (!S_ISLNK(st.st_mode) && !S_ISREG(st.st_mode)) { + throw std::runtime_error("Refusing to replace non-link path '" + path_ + "'"); + } + if (::unlink(path_.c_str()) != 0) { + throw std::runtime_error(io_error("Cannot remove existing tty link", path_)); + } + } else if (errno != ENOENT) { + throw std::runtime_error(io_error("Cannot inspect tty link path", path_)); + } + + if (::symlink(impl_->slave_path.c_str(), path_.c_str()) != 0) { + throw std::runtime_error(io_error("Cannot create tty symlink", path_)); + } + impl_->owns_link = true; + } catch (...) { + close_fd_if_open(impl_->slave_fd); + close_fd_if_open(impl_->fd); + throw; } - } catch (...) { - close_slave_fd(); - close_fd(); - throw; } - owns_link_ = true; + + impl_->worker = std::thread([this]() { worker_loop(); }); } TtyProtocolWriter::~TtyProtocolWriter() { - remove_owned_link(); - close_slave_fd(); - close_fd(); -} - -TtyProtocolWriter::TtyProtocolWriter(TtyProtocolWriter&& other) noexcept - : path_(std::move(other.path_)), - fd_(other.fd_), - slave_fd_(other.slave_fd_), - slave_path_(std::move(other.slave_path_)), - owns_link_(other.owns_link_) { - other.fd_ = -1; - other.slave_fd_ = -1; - other.owns_link_ = false; -} - -TtyProtocolWriter& TtyProtocolWriter::operator=(TtyProtocolWriter&& other) noexcept { - if (this != &other) { - remove_owned_link(); - close_slave_fd(); - close_fd(); - path_ = std::move(other.path_); - fd_ = other.fd_; - slave_fd_ = other.slave_fd_; - slave_path_ = std::move(other.slave_path_); - owns_link_ = other.owns_link_; - other.fd_ = -1; - other.slave_fd_ = -1; - other.owns_link_ = false; + try { + shutdown(); + } catch (...) { } - return *this; + + if (!impl_) { + return; + } + + if (impl_->owns_link && !path_.empty()) { + try { + const auto target = read_link_target(path_); + if (target && (*target == impl_->slave_path)) { + ::unlink(path_.c_str()); + } + } catch (...) { + } + impl_->owns_link = false; + } + + close_fd_if_open(impl_->slave_fd); + close_fd_if_open(impl_->fd); } -void TtyProtocolWriter::emit_packet_start() const { - write_frame(0x000A, 0xFFFF, 0xFFFF, 0xFFFF); +void TtyProtocolWriter::emit_packet_start() { + enqueue_frame(0x000A, 0xFFFF, 0xFFFF, 0xFFFF); } -void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) const { - write_frame(0x000A, - index, - static_cast(ch1_avg), - static_cast(ch2_avg)); +void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) { + enqueue_frame(0x000A, + index, + static_cast(ch1_avg), + static_cast(ch2_avg)); +} + +void TtyProtocolWriter::enqueue_encoded_frames(const uint16_t* words, std::size_t frame_count) { + if ((frame_count == 0U) || (words == nullptr)) { + return; + } + + throw_if_failed(); + + std::lock_guard lock(impl_->mutex); + if (impl_->failure) { + std::rethrow_exception(impl_->failure); + } + if (impl_->stop_requested) { + throw std::runtime_error("tty writer is already shut down"); + } + + std::size_t start_frame = 0; + std::size_t frames_to_copy = frame_count; + std::size_t dropped_frames = 0; + + if (frame_count >= impl_->capacity_frames) { + start_frame = frame_count - impl_->capacity_frames; + frames_to_copy = impl_->capacity_frames; + dropped_frames = impl_->size + start_frame; + impl_->head = 0; + impl_->size = 0; + } else { + const std::size_t available_frames = impl_->capacity_frames - impl_->size; + if (frame_count > available_frames) { + dropped_frames = frame_count - available_frames; + impl_->head = (impl_->head + dropped_frames) % impl_->capacity_frames; + impl_->size -= dropped_frames; + } + } + + if (dropped_frames != 0U) { + impl_->stats.frames_dropped += static_cast(dropped_frames); + ++impl_->stats.ring_overflows; + } + + for (std::size_t i = 0; i < frames_to_copy; ++i) { + const std::size_t src_index = (start_frame + i) * kFrameWordCount; + const std::size_t dst_index = (impl_->head + impl_->size) % impl_->capacity_frames; + impl_->ring[dst_index] = encode_frame(words[src_index + 0U], + words[src_index + 1U], + words[src_index + 2U], + words[src_index + 3U]); + ++impl_->size; + } + + impl_->data_ready_cv.notify_one(); +} + +TtyProtocolWriter::StatsSnapshot TtyProtocolWriter::stats() const { + if (!impl_) { + return {}; + } + + std::lock_guard lock(impl_->mutex); + return impl_->stats; } const std::string& TtyProtocolWriter::path() const { return path_; } -void TtyProtocolWriter::write_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) const { - const uint16_t frame[4] = {word0, word1, word2, word3}; - const std::uint8_t* bytes = reinterpret_cast(frame); - std::size_t remaining = sizeof(frame); - - while (remaining != 0U) { - const ssize_t written = ::write(fd_, bytes, remaining); - if (written < 0) { - if (errno == EINTR) { - continue; - } - throw std::runtime_error(io_error("Cannot write tty frame to", path_)); - } - if (written == 0) { - throw std::runtime_error("tty write returned 0 bytes for '" + path_ + "'"); - } - bytes += static_cast(written); - remaining -= static_cast(written); - } -} - -void TtyProtocolWriter::close_fd() noexcept { - if (fd_ >= 0) { - ::close(fd_); - fd_ = -1; - } -} - -void TtyProtocolWriter::close_slave_fd() noexcept { - if (slave_fd_ >= 0) { - ::close(slave_fd_); - slave_fd_ = -1; - } -} - -void TtyProtocolWriter::remove_owned_link() noexcept { - if (!owns_link_ || path_.empty()) { +void TtyProtocolWriter::throw_if_failed() const { + if (!impl_) { return; } - try { - const auto target = read_link_target(path_); - if (target && (*target == slave_path_)) { - ::unlink(path_.c_str()); - } - } catch (...) { + std::exception_ptr failure; + { + std::lock_guard lock(impl_->mutex); + failure = impl_->failure; } - owns_link_ = false; + if (failure) { + std::rethrow_exception(failure); + } +} + +void TtyProtocolWriter::shutdown() { + if (!impl_) { + return; + } + + { + std::lock_guard lock(impl_->mutex); + impl_->stop_requested = true; + } + + close_fd_if_open(impl_->fd); + impl_->data_ready_cv.notify_all(); + + if (impl_->worker.joinable()) { + impl_->worker.join(); + } +} + +void TtyProtocolWriter::enqueue_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { + const uint16_t words[kFrameWordCount] = {word0, word1, word2, word3}; + enqueue_encoded_frames(words, 1U); +} + +void TtyProtocolWriter::worker_loop() { + for (;;) { + EncodedFrame frame {}; + { + std::unique_lock lock(impl_->mutex); + impl_->data_ready_cv.wait(lock, [this]() { + return impl_->stop_requested || impl_->failure || (impl_->size != 0U); + }); + + if (impl_->failure || impl_->stop_requested) { + return; + } + + frame = impl_->ring[impl_->head]; + impl_->head = (impl_->head + 1U) % impl_->capacity_frames; + --impl_->size; + } + + const std::uint8_t* bytes = frame.data(); + std::size_t remaining = frame.size(); + while (remaining != 0U) { + const ssize_t written = ::write(impl_->fd, bytes, remaining); + if (written < 0) { + if (errno == EINTR) { + continue; + } + if ((errno == EAGAIN) || (errno == EWOULDBLOCK) || (errno == EIO)) { + { + std::lock_guard lock(impl_->mutex); + if (impl_->stop_requested) { + return; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + continue; + } + + std::lock_guard lock(impl_->mutex); + if (!impl_->stop_requested) { + impl_->failure = std::make_exception_ptr( + std::runtime_error(io_error("Cannot write tty frame to", path_))); + } + impl_->data_ready_cv.notify_all(); + return; + } + if (written == 0) { + std::lock_guard lock(impl_->mutex); + if (!impl_->stop_requested) { + impl_->failure = std::make_exception_ptr( + std::runtime_error("tty write returned 0 bytes for '" + path_ + "'")); + } + impl_->data_ready_cv.notify_all(); + return; + } + + bytes += static_cast(written); + remaining -= static_cast(written); + } + + { + std::lock_guard lock(impl_->mutex); + ++impl_->stats.frames_written; + } + } } #endif diff --git a/tty_protocol_writer.h b/tty_protocol_writer.h index 4f1a838..55cf616 100644 --- a/tty_protocol_writer.h +++ b/tty_protocol_writer.h @@ -1,34 +1,41 @@ #pragma once +#include #include +#include #include class TtyProtocolWriter { public: - explicit TtyProtocolWriter(std::string path); + struct StatsSnapshot { + std::uint64_t frames_written = 0; + std::uint64_t frames_dropped = 0; + std::uint64_t ring_overflows = 0; + }; + + TtyProtocolWriter(std::string path, std::size_t ring_capacity_bytes); ~TtyProtocolWriter(); TtyProtocolWriter(const TtyProtocolWriter&) = delete; TtyProtocolWriter& operator=(const TtyProtocolWriter&) = delete; - TtyProtocolWriter(TtyProtocolWriter&& other) noexcept; - TtyProtocolWriter& operator=(TtyProtocolWriter&& other) noexcept; + TtyProtocolWriter(TtyProtocolWriter&& other) noexcept = delete; + TtyProtocolWriter& operator=(TtyProtocolWriter&& other) noexcept = delete; - void emit_packet_start() const; - void emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) const; + void emit_packet_start(); + void emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg); + void enqueue_encoded_frames(const uint16_t* words, std::size_t frame_count); + StatsSnapshot stats() const; const std::string& path() const; + void throw_if_failed() const; + void shutdown(); private: - void write_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) const; - void close_fd() noexcept; - void close_slave_fd() noexcept; - void remove_owned_link() noexcept; + void enqueue_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3); + void worker_loop(); std::string path_; -#ifndef _WIN32 - int fd_ = -1; - int slave_fd_ = -1; - std::string slave_path_; - bool owns_link_ = false; -#endif + + struct Impl; + std::unique_ptr impl_; };