#include "tty_protocol_writer.h" #include #include #include #include #include #include #include #include #include #ifdef _WIN32 struct TtyProtocolWriter::Impl {}; TtyProtocolWriter::TtyProtocolWriter(std::string path, std::size_t ring_capacity_bytes) : path_(std::move(path)) { (void) ring_capacity_bytes; throw std::runtime_error("tty output is supported only on Linux/POSIX"); } TtyProtocolWriter::~TtyProtocolWriter() = default; void TtyProtocolWriter::emit_packet_start(uint16_t marker) { (void) marker; } void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) { (void) index; (void) ch1_avg; (void) ch2_avg; } void TtyProtocolWriter::enqueue_encoded_frames(const uint16_t* words, std::size_t frame_count) { (void) words; (void) frame_count; } TtyProtocolWriter::StatsSnapshot TtyProtocolWriter::stats() const { return {}; } const std::string& TtyProtocolWriter::path() const { return path_; } void TtyProtocolWriter::throw_if_failed() const {} void TtyProtocolWriter::shutdown() {} void TtyProtocolWriter::enqueue_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { (void) word0; (void) word1; (void) word2; (void) word3; } void TtyProtocolWriter::worker_loop() {} #else #include #include #include #include #include #include #include #include #include #include #include #include namespace { constexpr std::size_t kFrameWordCount = 4U; constexpr std::size_t kFrameByteCount = kFrameWordCount * sizeof(uint16_t); using EncodedFrame = std::array; std::string io_error(const std::string& action, const std::string& path) { std::ostringstream out; out << action << " '" << path << "': " << std::strerror(errno); return out.str(); } void close_fd_if_open(int& fd) noexcept { if (fd >= 0) { ::close(fd); fd = -1; } } void set_fd_raw(int fd) { struct termios tio {}; if (::tcgetattr(fd, &tio) != 0) { throw std::runtime_error(io_error("Cannot read tty attributes for", std::to_string(fd))); } ::cfmakeraw(&tio); tio.c_cc[VINTR] = _POSIX_VDISABLE; tio.c_cc[VQUIT] = _POSIX_VDISABLE; tio.c_cc[VERASE] = _POSIX_VDISABLE; tio.c_cc[VKILL] = _POSIX_VDISABLE; tio.c_cc[VEOF] = _POSIX_VDISABLE; tio.c_cc[VTIME] = 0; tio.c_cc[VMIN] = 1; #ifdef VSWTC tio.c_cc[VSWTC] = _POSIX_VDISABLE; #endif tio.c_cc[VSTART] = _POSIX_VDISABLE; tio.c_cc[VSTOP] = _POSIX_VDISABLE; tio.c_cc[VSUSP] = _POSIX_VDISABLE; #ifdef VEOL tio.c_cc[VEOL] = _POSIX_VDISABLE; #endif #ifdef VREPRINT tio.c_cc[VREPRINT] = _POSIX_VDISABLE; #endif #ifdef VDISCARD tio.c_cc[VDISCARD] = _POSIX_VDISABLE; #endif #ifdef VWERASE tio.c_cc[VWERASE] = _POSIX_VDISABLE; #endif #ifdef VLNEXT tio.c_cc[VLNEXT] = _POSIX_VDISABLE; #endif #ifdef VEOL2 tio.c_cc[VEOL2] = _POSIX_VDISABLE; #endif if (::tcsetattr(fd, TCSANOW, &tio) != 0) { throw std::runtime_error(io_error("Cannot apply raw tty attributes to", std::to_string(fd))); } } bool is_character_device_path(const std::string& path) { struct stat st {}; if (::stat(path.c_str(), &st) != 0) { if (errno == ENOENT) { return false; } throw std::runtime_error(io_error("Cannot stat tty output", path)); } return S_ISCHR(st.st_mode); } std::optional read_link_target(const std::string& path) { std::array buf {}; const ssize_t len = ::readlink(path.c_str(), buf.data(), buf.size() - 1U); if (len < 0) { if (errno == EINVAL || errno == ENOENT) { return std::nullopt; } throw std::runtime_error(io_error("Cannot read symlink", path)); } buf[static_cast(len)] = '\0'; return std::string(buf.data()); } EncodedFrame encode_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { const uint16_t words[kFrameWordCount] = {word0, word1, word2, word3}; EncodedFrame frame {}; std::memcpy(frame.data(), words, sizeof(words)); return frame; } } // namespace struct TtyProtocolWriter::Impl { explicit Impl(std::size_t ring_capacity_bytes) : capacity_frames(std::max(1U, ring_capacity_bytes / kFrameByteCount)), ring(capacity_frames) {} int fd = -1; int slave_fd = -1; std::string slave_path; bool owns_link = false; const std::size_t capacity_frames; std::vector ring; std::size_t head = 0; std::size_t size = 0; mutable std::mutex mutex; std::condition_variable data_ready_cv; std::thread worker; bool stop_requested = false; std::exception_ptr failure; StatsSnapshot stats; }; TtyProtocolWriter::TtyProtocolWriter(std::string path, std::size_t ring_capacity_bytes) : path_(std::move(path)), impl_(std::make_unique(ring_capacity_bytes)) { if (is_character_device_path(path_)) { impl_->fd = ::open(path_.c_str(), O_WRONLY | O_NOCTTY); if (impl_->fd < 0) { throw std::runtime_error(io_error("Cannot open tty output", path_)); } } else { std::array slave_name {}; if (::openpty(&impl_->fd, &impl_->slave_fd, slave_name.data(), nullptr, nullptr) != 0) { throw std::runtime_error(io_error("Cannot create PTY bridge for", path_)); } try { impl_->slave_path = slave_name.data(); set_fd_raw(impl_->slave_fd); struct stat st {}; if (::lstat(path_.c_str(), &st) == 0) { if (!S_ISLNK(st.st_mode) && !S_ISREG(st.st_mode)) { throw std::runtime_error("Refusing to replace non-link path '" + path_ + "'"); } if (::unlink(path_.c_str()) != 0) { throw std::runtime_error(io_error("Cannot remove existing tty link", path_)); } } else if (errno != ENOENT) { throw std::runtime_error(io_error("Cannot inspect tty link path", path_)); } if (::symlink(impl_->slave_path.c_str(), path_.c_str()) != 0) { throw std::runtime_error(io_error("Cannot create tty symlink", path_)); } impl_->owns_link = true; } catch (...) { close_fd_if_open(impl_->slave_fd); close_fd_if_open(impl_->fd); throw; } } impl_->worker = std::thread([this]() { worker_loop(); }); } TtyProtocolWriter::~TtyProtocolWriter() { try { shutdown(); } catch (...) { } if (!impl_) { return; } if (impl_->owns_link && !path_.empty()) { try { const auto target = read_link_target(path_); if (target && (*target == impl_->slave_path)) { ::unlink(path_.c_str()); } } catch (...) { } impl_->owns_link = false; } close_fd_if_open(impl_->slave_fd); close_fd_if_open(impl_->fd); } void TtyProtocolWriter::emit_packet_start(uint16_t marker) { enqueue_frame(marker, 0xFFFF, 0xFFFF, 0xFFFF); } void TtyProtocolWriter::emit_step(uint16_t index, int16_t ch1_avg, int16_t ch2_avg) { enqueue_frame( (cfg.profile == CaptureProfile::Amplitude) ? 0x001AU : 0x000AU, index, static_cast(ch1_avg), static_cast(ch2_avg)); } void TtyProtocolWriter::enqueue_encoded_frames(const uint16_t* words, std::size_t frame_count) { if ((frame_count == 0U) || (words == nullptr)) { return; } throw_if_failed(); std::lock_guard lock(impl_->mutex); if (impl_->failure) { std::rethrow_exception(impl_->failure); } if (impl_->stop_requested) { throw std::runtime_error("tty writer is already shut down"); } std::size_t start_frame = 0; std::size_t frames_to_copy = frame_count; std::size_t dropped_frames = 0; if (frame_count >= impl_->capacity_frames) { start_frame = frame_count - impl_->capacity_frames; frames_to_copy = impl_->capacity_frames; dropped_frames = impl_->size + start_frame; impl_->head = 0; impl_->size = 0; } else { const std::size_t available_frames = impl_->capacity_frames - impl_->size; if (frame_count > available_frames) { dropped_frames = frame_count - available_frames; impl_->head = (impl_->head + dropped_frames) % impl_->capacity_frames; impl_->size -= dropped_frames; } } if (dropped_frames != 0U) { impl_->stats.frames_dropped += static_cast(dropped_frames); ++impl_->stats.ring_overflows; } for (std::size_t i = 0; i < frames_to_copy; ++i) { const std::size_t src_index = (start_frame + i) * kFrameWordCount; const std::size_t dst_index = (impl_->head + impl_->size) % impl_->capacity_frames; impl_->ring[dst_index] = encode_frame(words[src_index + 0U], words[src_index + 1U], words[src_index + 2U], words[src_index + 3U]); ++impl_->size; } impl_->data_ready_cv.notify_one(); } TtyProtocolWriter::StatsSnapshot TtyProtocolWriter::stats() const { if (!impl_) { return {}; } std::lock_guard lock(impl_->mutex); return impl_->stats; } const std::string& TtyProtocolWriter::path() const { return path_; } void TtyProtocolWriter::throw_if_failed() const { if (!impl_) { return; } std::exception_ptr failure; { std::lock_guard lock(impl_->mutex); failure = impl_->failure; } if (failure) { std::rethrow_exception(failure); } } void TtyProtocolWriter::shutdown() { if (!impl_) { return; } { std::lock_guard lock(impl_->mutex); impl_->stop_requested = true; } close_fd_if_open(impl_->fd); impl_->data_ready_cv.notify_all(); if (impl_->worker.joinable()) { impl_->worker.join(); } } void TtyProtocolWriter::enqueue_frame(uint16_t word0, uint16_t word1, uint16_t word2, uint16_t word3) { const uint16_t words[kFrameWordCount] = {word0, word1, word2, word3}; enqueue_encoded_frames(words, 1U); } void TtyProtocolWriter::worker_loop() { for (;;) { EncodedFrame frame {}; { std::unique_lock lock(impl_->mutex); impl_->data_ready_cv.wait(lock, [this]() { return impl_->stop_requested || impl_->failure || (impl_->size != 0U); }); if (impl_->failure || impl_->stop_requested) { return; } frame = impl_->ring[impl_->head]; impl_->head = (impl_->head + 1U) % impl_->capacity_frames; --impl_->size; } const std::uint8_t* bytes = frame.data(); std::size_t remaining = frame.size(); while (remaining != 0U) { const ssize_t written = ::write(impl_->fd, bytes, remaining); if (written < 0) { if (errno == EINTR) { continue; } if ((errno == EAGAIN) || (errno == EWOULDBLOCK) || (errno == EIO)) { { std::lock_guard lock(impl_->mutex); if (impl_->stop_requested) { return; } } std::this_thread::sleep_for(std::chrono::milliseconds(5)); continue; } std::lock_guard lock(impl_->mutex); if (!impl_->stop_requested) { impl_->failure = std::make_exception_ptr( std::runtime_error(io_error("Cannot write tty frame to", path_))); } impl_->data_ready_cv.notify_all(); return; } if (written == 0) { std::lock_guard lock(impl_->mutex); if (!impl_->stop_requested) { impl_->failure = std::make_exception_ptr( std::runtime_error("tty write returned 0 bytes for '" + path_ + "'")); } impl_->data_ready_cv.notify_all(); return; } bytes += static_cast(written); remaining -= static_cast(written); } { std::lock_guard lock(impl_->mutex); ++impl_->stats.frames_written; } } } #endif