From afc595bc51562e1ee671660d96f06a7da4a13464 Mon Sep 17 00:00:00 2001 From: awe Date: Fri, 20 Feb 2026 16:44:26 +0300 Subject: [PATCH] initial commit --- CMakeLists.txt | 27 +++ include/adc_sweep/device_discovery.hpp | 11 ++ include/adc_sweep/serial_port.hpp | 24 +++ include/adc_sweep/shm_stack.hpp | 82 ++++++++ include/adc_sweep/sweep_core.hpp | 71 +++++++ src/device_discovery.cpp | 93 +++++++++ src/main.cpp | 261 +++++++++++++++++++++++++ src/serial_port.cpp | 100 ++++++++++ src/shm_stack.cpp | 209 ++++++++++++++++++++ src/sweep_core.cpp | 259 ++++++++++++++++++++++++ tests/test_sweep_core.cpp | 237 ++++++++++++++++++++++ 11 files changed, 1374 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 include/adc_sweep/device_discovery.hpp create mode 100644 include/adc_sweep/serial_port.hpp create mode 100644 include/adc_sweep/shm_stack.hpp create mode 100644 include/adc_sweep/sweep_core.hpp create mode 100644 src/device_discovery.cpp create mode 100644 src/main.cpp create mode 100644 src/serial_port.cpp create mode 100644 src/shm_stack.cpp create mode 100644 src/sweep_core.cpp create mode 100644 tests/test_sweep_core.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..0fcd105 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,27 @@ +cmake_minimum_required(VERSION 3.16) + +project(adc_sweep_shm_writer LANGUAGES CXX) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS OFF) + +add_library(adc_sweep_core + src/sweep_core.cpp + src/shm_stack.cpp + src/device_discovery.cpp + src/serial_port.cpp +) + +target_include_directories(adc_sweep_core PUBLIC include) + +target_compile_options(adc_sweep_core PRIVATE -Wall -Wextra -Wpedantic) + +add_executable(adc_sweep_shm_writer src/main.cpp) +target_link_libraries(adc_sweep_shm_writer PRIVATE adc_sweep_core) + +add_executable(adc_sweep_shm_tests tests/test_sweep_core.cpp) +target_link_libraries(adc_sweep_shm_tests PRIVATE adc_sweep_core) + +enable_testing() +add_test(NAME adc_sweep_shm_tests COMMAND adc_sweep_shm_tests) diff --git a/include/adc_sweep/device_discovery.hpp b/include/adc_sweep/device_discovery.hpp new file mode 100644 index 0000000..edd25c9 --- /dev/null +++ b/include/adc_sweep/device_discovery.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include +#include +#include + +namespace adc_sweep { + +std::optional find_tty_by_vid_pid(uint16_t vid, uint16_t pid); + +} // namespace adc_sweep diff --git a/include/adc_sweep/serial_port.hpp b/include/adc_sweep/serial_port.hpp new file mode 100644 index 0000000..530bcc9 --- /dev/null +++ b/include/adc_sweep/serial_port.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include + +namespace adc_sweep { + +class SerialPort { +public: + SerialPort() = default; + ~SerialPort(); + + bool open_raw(const std::string& path, int baud, std::string& err); + void close(); + bool is_open() const { return fd_ >= 0; } + + ssize_t read_some(uint8_t* buf, size_t cap, std::string& err); + +private: + int fd_ = -1; +}; + +} // namespace adc_sweep diff --git a/include/adc_sweep/shm_stack.hpp b/include/adc_sweep/shm_stack.hpp new file mode 100644 index 0000000..e84a270 --- /dev/null +++ b/include/adc_sweep/shm_stack.hpp @@ -0,0 +1,82 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "adc_sweep/sweep_core.hpp" + +namespace adc_sweep { + +constexpr uint32_t kShmMagic = 0x53575041; // "AWPS" +constexpr uint32_t kShmVersion = 1; + +struct alignas(64) ShmHeader { + uint32_t magic; + uint32_t version; + uint32_t capacity; + uint32_t sweep_width; + uint32_t slot_stride; + uint32_t reserved0; + uint64_t producer_seq; + std::atomic write_seq; + std::atomic latest_slot; + uint32_t reserved1; + uint8_t reserved2[64 - 48]; +}; +static_assert(sizeof(ShmHeader) == 64, "ShmHeader must stay fixed size"); + +struct SweepSlotHeader { + std::atomic seq; + uint64_t ts_mono_ns; + uint32_t sweep_idx; + int32_t ch_primary; + uint32_t ch_mask; + uint32_t reserved; + float n_valid; + float min; + float max; + float mean; + float std; + float dt_ms; +}; + +class ShmSweepStackWriter { +public: + ShmSweepStackWriter(std::string shm_name, uint32_t capacity, uint32_t sweep_width); + ~ShmSweepStackWriter(); + + bool open_or_create(std::string& err); + void close(); + bool publish(const SweepResult& result, std::string& err); + + uint32_t capacity() const { return capacity_; } + uint32_t sweep_width() const { return sweep_width_; } + +private: + SweepSlotHeader* slot_header(uint32_t slot_idx); + float* slot_sweep_data(uint32_t slot_idx); + + std::string shm_name_; + uint32_t capacity_; + uint32_t sweep_width_; + uint32_t slot_stride_; + int shm_fd_ = -1; + void* mapped_ = nullptr; + size_t mapped_size_ = 0; + ShmHeader* header_ = nullptr; + uint64_t next_seq_ = 1; +}; + +struct LatestSnapshot { + uint64_t seq = 0; + SweepMeta meta; + std::vector sweep; +}; + +std::optional read_latest_snapshot(void* mapped, uint32_t capacity, uint32_t sweep_width, uint32_t slot_stride); + +} // namespace adc_sweep diff --git a/include/adc_sweep/sweep_core.hpp b/include/adc_sweep/sweep_core.hpp new file mode 100644 index 0000000..9816021 --- /dev/null +++ b/include/adc_sweep/sweep_core.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace adc_sweep { + +struct SweepMeta { + uint32_t sweep_idx = 0; + int32_t ch_primary = 0; + uint32_t ch_mask = 0; + float n_valid = 0.0f; + float min = 0.0f; + float max = 0.0f; + float mean = 0.0f; + float std = 0.0f; + float dt_ms = 0.0f; + uint64_t ts_mono_ns = 0; +}; + +struct SweepResult { + std::vector sweep; + SweepMeta meta; +}; + +int32_t u32_to_i32(uint32_t v); + +class SweepFinalizer { +public: + SweepFinalizer(uint32_t sweep_width, float invert_threshold, bool fancy_fill); + + std::optional finalize( + const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary); + +private: + uint32_t sweep_width_; + float invert_threshold_; + bool fancy_fill_; + uint32_t max_width_seen_ = 0; + uint32_t sweep_idx_ = 0; + std::optional last_sweep_ts_; + std::deque> n_valid_hist_; +}; + +class BinarySweepParser { +public: + using SweepCallback = std::function&, const std::vector&, uint32_t, int32_t)>; + + void feed(const uint8_t* data, size_t size, const SweepCallback& on_sweep); + void flush(const SweepCallback& on_sweep); + +private: + void emit_current(const SweepCallback& on_sweep); + + std::vector xs_; + std::vector ys_; + bool has_cur_channel_ = false; + int32_t cur_channel_ = 0; + uint32_t ch_mask_ = 0; + std::deque words_; + std::optional odd_byte_; +}; + +} // namespace adc_sweep diff --git a/src/device_discovery.cpp b/src/device_discovery.cpp new file mode 100644 index 0000000..0654416 --- /dev/null +++ b/src/device_discovery.cpp @@ -0,0 +1,93 @@ +#include "adc_sweep/device_discovery.hpp" + +#include +#include +#include +#include +#include +#include + +namespace adc_sweep { +namespace fs = std::filesystem; + +namespace { + +std::string trim(std::string s) { + while (!s.empty() && (s.back() == '\n' || s.back() == '\r' || s.back() == ' ' || s.back() == '\t')) { + s.pop_back(); + } + size_t i = 0; + while (i < s.size() && (s[i] == ' ' || s[i] == '\t')) { + ++i; + } + return s.substr(i); +} + +std::string read_file_trim(const fs::path& p) { + std::ifstream f(p); + if (!f.is_open()) { + return {}; + } + std::ostringstream ss; + ss << f.rdbuf(); + return trim(ss.str()); +} + +std::string hex4(uint16_t v) { + constexpr char kHex[] = "0123456789abcdef"; + std::string out(4, '0'); + out[0] = kHex[(v >> 12U) & 0xFU]; + out[1] = kHex[(v >> 8U) & 0xFU]; + out[2] = kHex[(v >> 4U) & 0xFU]; + out[3] = kHex[v & 0xFU]; + return out; +} + +bool tty_name_ok(const std::string& name) { + return (name.rfind("ttyACM", 0) == 0) || (name.rfind("ttyUSB", 0) == 0); +} + +} // namespace + +std::optional find_tty_by_vid_pid(uint16_t vid, uint16_t pid) { + const fs::path usb_root("/sys/bus/usb/devices"); + if (!fs::exists(usb_root)) { + return std::nullopt; + } + + const std::string vid_s = hex4(vid); + const std::string pid_s = hex4(pid); + std::vector candidates; + + for (const auto& entry : fs::directory_iterator(usb_root)) { + if (!entry.is_directory()) { + continue; + } + const fs::path devdir = entry.path(); + const std::string v = read_file_trim(devdir / "idVendor"); + const std::string p = read_file_trim(devdir / "idProduct"); + if (v != vid_s || p != pid_s) { + continue; + } + + for (const auto& sub : fs::recursive_directory_iterator(devdir, fs::directory_options::skip_permission_denied)) { + const std::string name = sub.path().filename().string(); + if (!tty_name_ok(name)) { + continue; + } + const std::string devnode = "/dev/" + name; + if (fs::exists(devnode)) { + candidates.push_back(devnode); + } + } + } + + if (candidates.empty()) { + return std::nullopt; + } + std::sort(candidates.begin(), candidates.end()); + candidates.erase(std::unique(candidates.begin(), candidates.end()), candidates.end()); + return candidates.front(); +} + +} // namespace adc_sweep diff --git a/src/main.cpp b/src/main.cpp new file mode 100644 index 0000000..340eeb5 --- /dev/null +++ b/src/main.cpp @@ -0,0 +1,261 @@ +#include "adc_sweep/device_discovery.hpp" +#include "adc_sweep/serial_port.hpp" +#include "adc_sweep/shm_stack.hpp" +#include "adc_sweep/sweep_core.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + +std::atomic g_stop{false}; + +void on_signal(int) { + g_stop.store(true, std::memory_order_relaxed); +} + +struct Config { + uint16_t vid = 0; + uint16_t pid = 0; + int baud = 115200; + std::string shm_name = "/adc_sweeps"; + uint32_t capacity = 256; + uint32_t sweep_width = 1000; + float invert_threshold = 10.0f; + bool fancy_fill = false; + int reconnect_ms = 500; +}; + +void print_usage(const char* argv0) { + std::cerr + << "Usage: " << argv0 << " --vid --pid [options]\n" + << "Options:\n" + << " --baud default 115200\n" + << " --shm-name default /adc_sweeps\n" + << " --capacity default 256\n" + << " --sweep-width default 1000\n" + << " --invert-threshold default 10.0\n" + << " --fancy-fill <0|1> default 0\n" + << " --reconnect-ms default 500\n"; +} + +bool parse_u16(const std::string& s, uint16_t& out) { + char* end = nullptr; + errno = 0; + const unsigned long v = std::strtoul(s.c_str(), &end, 0); + if (errno != 0 || end == s.c_str() || *end != '\0' || v > 0xFFFFUL) { + return false; + } + out = static_cast(v); + return true; +} + +bool parse_u32(const std::string& s, uint32_t& out) { + char* end = nullptr; + errno = 0; + const unsigned long v = std::strtoul(s.c_str(), &end, 0); + if (errno != 0 || end == s.c_str() || *end != '\0' || v > 0xFFFFFFFFUL) { + return false; + } + out = static_cast(v); + return true; +} + +bool parse_int(const std::string& s, int& out) { + char* end = nullptr; + errno = 0; + const long v = std::strtol(s.c_str(), &end, 0); + if (errno != 0 || end == s.c_str() || *end != '\0') { + return false; + } + out = static_cast(v); + return true; +} + +bool parse_float(const std::string& s, float& out) { + char* end = nullptr; + errno = 0; + const float v = std::strtof(s.c_str(), &end); + if (errno != 0 || end == s.c_str() || *end != '\0') { + return false; + } + out = v; + return true; +} + +bool parse_args(int argc, char** argv, Config& cfg) { + bool has_vid = false; + bool has_pid = false; + + for (int i = 1; i < argc; ++i) { + const std::string a = argv[i]; + auto require_value = [&](const char* key) -> std::optional { + if (i + 1 >= argc) { + std::cerr << "Missing value for " << key << "\n"; + return std::nullopt; + } + return std::string(argv[++i]); + }; + + if (a == "--vid") { + auto v = require_value("--vid"); + if (!v || !parse_u16(*v, cfg.vid)) { + return false; + } + has_vid = true; + } else if (a == "--pid") { + auto v = require_value("--pid"); + if (!v || !parse_u16(*v, cfg.pid)) { + return false; + } + has_pid = true; + } else if (a == "--baud") { + auto v = require_value("--baud"); + if (!v || !parse_int(*v, cfg.baud)) { + return false; + } + } else if (a == "--shm-name") { + auto v = require_value("--shm-name"); + if (!v) { + return false; + } + cfg.shm_name = *v; + } else if (a == "--capacity") { + auto v = require_value("--capacity"); + if (!v || !parse_u32(*v, cfg.capacity)) { + return false; + } + } else if (a == "--sweep-width") { + auto v = require_value("--sweep-width"); + if (!v || !parse_u32(*v, cfg.sweep_width)) { + return false; + } + } else if (a == "--invert-threshold") { + auto v = require_value("--invert-threshold"); + if (!v || !parse_float(*v, cfg.invert_threshold)) { + return false; + } + } else if (a == "--fancy-fill") { + auto v = require_value("--fancy-fill"); + if (!v) { + return false; + } + if (*v == "1") { + cfg.fancy_fill = true; + } else if (*v == "0") { + cfg.fancy_fill = false; + } else { + return false; + } + } else if (a == "--reconnect-ms") { + auto v = require_value("--reconnect-ms"); + if (!v || !parse_int(*v, cfg.reconnect_ms)) { + return false; + } + } else if (a == "--help" || a == "-h") { + return false; + } else { + std::cerr << "Unknown argument: " << a << "\n"; + return false; + } + } + + return has_vid && has_pid; +} + +} // namespace + +int main(int argc, char** argv) { + Config cfg; + if (!parse_args(argc, argv, cfg)) { + print_usage(argv[0]); + return 2; + } + + std::signal(SIGINT, on_signal); + std::signal(SIGTERM, on_signal); + + adc_sweep::ShmSweepStackWriter writer(cfg.shm_name, cfg.capacity, cfg.sweep_width); + { + std::string err; + if (!writer.open_or_create(err)) { + std::cerr << "[error] failed to open shared memory: " << err << "\n"; + return 1; + } + } + + adc_sweep::SweepFinalizer finalizer(cfg.sweep_width, cfg.invert_threshold, cfg.fancy_fill); + + while (!g_stop.load(std::memory_order_relaxed)) { + const auto tty = adc_sweep::find_tty_by_vid_pid(cfg.vid, cfg.pid); + if (!tty.has_value()) { + std::this_thread::sleep_for(std::chrono::milliseconds(cfg.reconnect_ms)); + continue; + } + + adc_sweep::SerialPort port; + { + std::string err; + if (!port.open_raw(*tty, cfg.baud, err)) { + std::cerr << "[warn] open " << *tty << " failed: " << err << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(cfg.reconnect_ms)); + continue; + } + } + std::cerr << "[info] reading from " << *tty << "\n"; + + adc_sweep::BinarySweepParser parser; + uint8_t buf[65536]; + + while (!g_stop.load(std::memory_order_relaxed) && port.is_open()) { + std::string err; + const ssize_t n = port.read_some(buf, sizeof(buf), err); + if (n > 0) { + parser.feed(buf, static_cast(n), [&](const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + auto out = finalizer.finalize(xs, ys, ch_mask, ch_primary); + if (!out.has_value()) { + return; + } + std::string perr; + if (!writer.publish(*out, perr)) { + std::cerr << "[warn] shm publish failed: " << perr << "\n"; + } + }); + } else if (n == 0) { + std::this_thread::sleep_for(std::chrono::microseconds(500)); + } else { + std::cerr << "[warn] serial read error on " << *tty << ": " << err << "\n"; + break; + } + } + + parser.flush([&](const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + auto out = finalizer.finalize(xs, ys, ch_mask, ch_primary); + if (!out.has_value()) { + return; + } + std::string perr; + (void)writer.publish(*out, perr); + }); + + port.close(); + std::this_thread::sleep_for(std::chrono::milliseconds(cfg.reconnect_ms)); + } + + return 0; +} diff --git a/src/serial_port.cpp b/src/serial_port.cpp new file mode 100644 index 0000000..104689a --- /dev/null +++ b/src/serial_port.cpp @@ -0,0 +1,100 @@ +#include "adc_sweep/serial_port.hpp" + +#include +#include +#include +#include +#include + +namespace adc_sweep { + +namespace { + +speed_t map_baud(int baud) { + switch (baud) { + case 9600: + return B9600; + case 19200: + return B19200; + case 38400: + return B38400; + case 57600: + return B57600; + case 115200: + return B115200; +#ifdef B230400 + case 230400: + return B230400; +#endif +#ifdef B460800 + case 460800: + return B460800; +#endif + default: + return B115200; + } +} + +} // namespace + +SerialPort::~SerialPort() { + close(); +} + +bool SerialPort::open_raw(const std::string& path, int baud, std::string& err) { + close(); + + fd_ = ::open(path.c_str(), O_RDONLY | O_NOCTTY | O_NONBLOCK); + if (fd_ < 0) { + err = std::string("open failed: ") + std::strerror(errno); + return false; + } + + termios tio{}; + if (::tcgetattr(fd_, &tio) != 0) { + err = std::string("tcgetattr failed: ") + std::strerror(errno); + close(); + return false; + } + + ::cfmakeraw(&tio); + const speed_t b = map_baud(baud); + if (::cfsetispeed(&tio, b) != 0 || ::cfsetospeed(&tio, b) != 0) { + err = std::string("cfset*speed failed: ") + std::strerror(errno); + close(); + return false; + } + + tio.c_cc[VMIN] = 0; + tio.c_cc[VTIME] = 0; + + if (::tcsetattr(fd_, TCSANOW, &tio) != 0) { + err = std::string("tcsetattr failed: ") + std::strerror(errno); + close(); + return false; + } + + return true; +} + +void SerialPort::close() { + if (fd_ >= 0) { + ::close(fd_); + fd_ = -1; + } +} + +ssize_t SerialPort::read_some(uint8_t* buf, size_t cap, std::string& err) { + if (fd_ < 0) { + err = "serial port is closed"; + return -1; + } + const ssize_t n = ::read(fd_, buf, cap); + if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + err = std::string("read failed: ") + std::strerror(errno); + return -1; + } + return n; +} + +} // namespace adc_sweep diff --git a/src/shm_stack.cpp b/src/shm_stack.cpp new file mode 100644 index 0000000..47849eb --- /dev/null +++ b/src/shm_stack.cpp @@ -0,0 +1,209 @@ +#include "adc_sweep/shm_stack.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace adc_sweep { + +namespace { + +constexpr uint32_t align_up(uint32_t value, uint32_t align) { + return (value + align - 1U) & ~(align - 1U); +} + +size_t calc_size(uint32_t capacity, uint32_t slot_stride) { + return sizeof(ShmHeader) + static_cast(capacity) * static_cast(slot_stride); +} + +} // namespace + +ShmSweepStackWriter::ShmSweepStackWriter(std::string shm_name, uint32_t capacity, uint32_t sweep_width) + : shm_name_(std::move(shm_name)), capacity_(capacity), sweep_width_(sweep_width) { + const uint32_t slot_payload = static_cast(sizeof(SweepSlotHeader) + static_cast(sweep_width_) * sizeof(float)); + slot_stride_ = align_up(slot_payload, 64); +} + +ShmSweepStackWriter::~ShmSweepStackWriter() { + close(); +} + +bool ShmSweepStackWriter::open_or_create(std::string& err) { + close(); + + if (shm_name_.empty() || shm_name_[0] != '/') { + err = "shm name must start with '/'"; + return false; + } + if (capacity_ == 0 || sweep_width_ == 0) { + err = "capacity and sweep_width must be > 0"; + return false; + } + + shm_fd_ = ::shm_open(shm_name_.c_str(), O_CREAT | O_RDWR, 0666); + if (shm_fd_ < 0) { + err = std::string("shm_open failed: ") + std::strerror(errno); + return false; + } + + mapped_size_ = calc_size(capacity_, slot_stride_); + if (::ftruncate(shm_fd_, static_cast(mapped_size_)) != 0) { + err = std::string("ftruncate failed: ") + std::strerror(errno); + close(); + return false; + } + + mapped_ = ::mmap(nullptr, mapped_size_, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0); + if (mapped_ == MAP_FAILED) { + mapped_ = nullptr; + err = std::string("mmap failed: ") + std::strerror(errno); + close(); + return false; + } + + header_ = reinterpret_cast(mapped_); + + const bool needs_init = !(header_->magic == kShmMagic && + header_->version == kShmVersion && + header_->capacity == capacity_ && + header_->sweep_width == sweep_width_ && + header_->slot_stride == slot_stride_); + + if (needs_init) { + std::memset(mapped_, 0, mapped_size_); + header_->magic = kShmMagic; + header_->version = kShmVersion; + header_->capacity = capacity_; + header_->sweep_width = sweep_width_; + header_->slot_stride = slot_stride_; + header_->producer_seq = 0; + new (&header_->write_seq) std::atomic(0); + new (&header_->latest_slot) std::atomic(0); + + for (uint32_t i = 0; i < capacity_; ++i) { + auto* sh = slot_header(i); + new (&sh->seq) std::atomic(0); + sh->ts_mono_ns = 0; + } + next_seq_ = 1; + } else { + next_seq_ = header_->write_seq.load(std::memory_order_acquire) + 1; + if (next_seq_ == 0) { + next_seq_ = 1; + } + } + + return true; +} + +void ShmSweepStackWriter::close() { + if (mapped_ != nullptr) { + ::munmap(mapped_, mapped_size_); + mapped_ = nullptr; + } + mapped_size_ = 0; + header_ = nullptr; + if (shm_fd_ >= 0) { + ::close(shm_fd_); + shm_fd_ = -1; + } +} + +SweepSlotHeader* ShmSweepStackWriter::slot_header(uint32_t slot_idx) { + auto* base = static_cast(mapped_) + sizeof(ShmHeader) + static_cast(slot_idx) * slot_stride_; + return reinterpret_cast(base); +} + +float* ShmSweepStackWriter::slot_sweep_data(uint32_t slot_idx) { + auto* base = reinterpret_cast(slot_header(slot_idx)); + return reinterpret_cast(base + sizeof(SweepSlotHeader)); +} + +bool ShmSweepStackWriter::publish(const SweepResult& result, std::string& err) { + if (header_ == nullptr) { + err = "shared memory is not opened"; + return false; + } + + const uint64_t seq = next_seq_++; + const uint32_t slot_idx = static_cast(seq % static_cast(capacity_)); + auto* sh = slot_header(slot_idx); + float* dst = slot_sweep_data(slot_idx); + + sh->seq.store(0, std::memory_order_relaxed); + + sh->ts_mono_ns = result.meta.ts_mono_ns; + sh->sweep_idx = result.meta.sweep_idx; + sh->ch_primary = result.meta.ch_primary; + sh->ch_mask = result.meta.ch_mask; + sh->reserved = 0; + sh->n_valid = result.meta.n_valid; + sh->min = result.meta.min; + sh->max = result.meta.max; + sh->mean = result.meta.mean; + sh->std = result.meta.std; + sh->dt_ms = result.meta.dt_ms; + + const float nan = std::numeric_limits::quiet_NaN(); + const size_t n = std::min(result.sweep.size(), sweep_width_); + for (size_t i = 0; i < n; ++i) { + dst[i] = result.sweep[i]; + } + for (size_t i = n; i < sweep_width_; ++i) { + dst[i] = nan; + } + + std::atomic_thread_fence(std::memory_order_release); + sh->seq.store(seq, std::memory_order_release); + header_->latest_slot.store(slot_idx, std::memory_order_release); + header_->write_seq.store(seq, std::memory_order_release); + header_->producer_seq = seq; + return true; +} + +std::optional read_latest_snapshot(void* mapped, uint32_t capacity, uint32_t sweep_width, uint32_t slot_stride) { + if (mapped == nullptr || capacity == 0 || sweep_width == 0) { + return std::nullopt; + } + auto* hdr = reinterpret_cast(mapped); + const uint32_t slot_idx = hdr->latest_slot.load(std::memory_order_acquire); + if (slot_idx >= capacity) { + return std::nullopt; + } + + auto* base = static_cast(mapped) + sizeof(ShmHeader) + static_cast(slot_idx) * slot_stride; + auto* sh = reinterpret_cast(base); + float* src = reinterpret_cast(base + sizeof(SweepSlotHeader)); + + const uint64_t seq1 = sh->seq.load(std::memory_order_acquire); + if (seq1 == 0) { + return std::nullopt; + } + + LatestSnapshot snap; + snap.seq = seq1; + snap.meta.ts_mono_ns = sh->ts_mono_ns; + snap.meta.sweep_idx = sh->sweep_idx; + snap.meta.ch_primary = sh->ch_primary; + snap.meta.ch_mask = sh->ch_mask; + snap.meta.n_valid = sh->n_valid; + snap.meta.min = sh->min; + snap.meta.max = sh->max; + snap.meta.mean = sh->mean; + snap.meta.std = sh->std; + snap.meta.dt_ms = sh->dt_ms; + snap.sweep.assign(src, src + sweep_width); + + const uint64_t seq2 = sh->seq.load(std::memory_order_acquire); + if (seq1 != seq2 || seq2 == 0) { + return std::nullopt; + } + return snap; +} + +} // namespace adc_sweep diff --git a/src/sweep_core.cpp b/src/sweep_core.cpp new file mode 100644 index 0000000..1c2feca --- /dev/null +++ b/src/sweep_core.cpp @@ -0,0 +1,259 @@ +#include "adc_sweep/sweep_core.hpp" + +#include +#include +#include + +namespace adc_sweep { + +int32_t u32_to_i32(uint32_t v) { + return (v & 0x80000000U) ? static_cast(static_cast(v) - 0x100000000LL) : static_cast(v); +} + +SweepFinalizer::SweepFinalizer(uint32_t sweep_width, float invert_threshold, bool fancy_fill) + : sweep_width_(sweep_width), invert_threshold_(invert_threshold), fancy_fill_(fancy_fill) {} + +std::optional SweepFinalizer::finalize( + const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + if (xs.empty() || ys.empty() || sweep_width_ == 0) { + return std::nullopt; + } + + const int max_x = *std::max_element(xs.begin(), xs.end()); + if (max_x < 0) { + return std::nullopt; + } + + const uint32_t width = static_cast(max_x + 1); + max_width_seen_ = std::max(max_width_seen_, width); + const uint32_t target_width = std::min(fancy_fill_ ? max_width_seen_ : width, sweep_width_); + if (target_width == 0) { + return std::nullopt; + } + + const float nan = std::numeric_limits::quiet_NaN(); + std::vector sweep(target_width, nan); + for (size_t i = 0; i < xs.size() && i < ys.size(); ++i) { + const int x = xs[i]; + if (x < 0 || static_cast(x) >= target_width) { + continue; + } + sweep[static_cast(x)] = static_cast(ys[i]); + } + + int n_valid_cur = 0; + for (float v : sweep) { + if (std::isfinite(v)) { + ++n_valid_cur; + } + } + + if (fancy_fill_) { + std::vector known_idx; + known_idx.reserve(sweep.size()); + for (size_t i = 0; i < sweep.size(); ++i) { + if (std::isfinite(sweep[i])) { + known_idx.push_back(i); + } + } + if (!known_idx.empty()) { + for (size_t k = 0; k + 1 < known_idx.size(); ++k) { + const size_t i0 = known_idx[k]; + const size_t i1 = known_idx[k + 1]; + if (i1 > i0 + 1) { + const float avg = 0.5F * (sweep[i0] + sweep[i1]); + for (size_t i = i0 + 1; i < i1; ++i) { + sweep[i] = avg; + } + } + } + const size_t first = known_idx.front(); + const size_t last = known_idx.back(); + for (size_t i = 0; i < first; ++i) { + sweep[i] = sweep[first]; + } + for (size_t i = last + 1; i < sweep.size(); ++i) { + sweep[i] = sweep[last]; + } + } + } + + double sum = 0.0; + int cnt = 0; + for (float v : sweep) { + if (!std::isfinite(v)) { + continue; + } + sum += static_cast(v); + ++cnt; + } + const double mean_pre = (cnt > 0) ? (sum / static_cast(cnt)) : std::numeric_limits::quiet_NaN(); + if (std::isfinite(mean_pre) && mean_pre < static_cast(invert_threshold_)) { + for (float& v : sweep) { + if (std::isfinite(v)) { + v = -v; + } + } + } + + float min_v = nan; + float max_v = nan; + float mean_v = nan; + float std_v = nan; + + double s1 = 0.0; + double s2 = 0.0; + int n = 0; + for (float v : sweep) { + if (!std::isfinite(v)) { + continue; + } + if (n == 0) { + min_v = max_v = v; + } else { + min_v = std::min(min_v, v); + max_v = std::max(max_v, v); + } + s1 += static_cast(v); + s2 += static_cast(v) * static_cast(v); + ++n; + } + if (n > 0) { + mean_v = static_cast(s1 / static_cast(n)); + const double var = std::max(0.0, (s2 / static_cast(n)) - (static_cast(mean_v) * static_cast(mean_v))); + std_v = static_cast(std::sqrt(var)); + } + + const auto now = std::chrono::steady_clock::now(); + float dt_ms = std::numeric_limits::quiet_NaN(); + if (last_sweep_ts_.has_value()) { + const auto dt = std::chrono::duration_cast(now - *last_sweep_ts_).count(); + dt_ms = static_cast(static_cast(dt) / 1000.0); + } + last_sweep_ts_ = now; + + n_valid_hist_.emplace_back(now, n_valid_cur); + while (!n_valid_hist_.empty()) { + const auto age = std::chrono::duration_cast(now - n_valid_hist_.front().first).count(); + if (age <= 1000) { + break; + } + n_valid_hist_.pop_front(); + } + + double valid_avg = 0.0; + if (!n_valid_hist_.empty()) { + int acc = 0; + for (const auto& item : n_valid_hist_) { + acc += item.second; + } + valid_avg = static_cast(acc) / static_cast(n_valid_hist_.size()); + } else { + valid_avg = static_cast(n_valid_cur); + } + + SweepResult out; + out.sweep = std::move(sweep); + out.meta.sweep_idx = ++sweep_idx_; + out.meta.ch_primary = ch_primary; + out.meta.ch_mask = ch_mask; + out.meta.n_valid = static_cast(valid_avg); + out.meta.min = min_v; + out.meta.max = max_v; + out.meta.mean = mean_v; + out.meta.std = std_v; + out.meta.dt_ms = dt_ms; + out.meta.ts_mono_ns = static_cast( + std::chrono::duration_cast(now.time_since_epoch()).count()); + return out; +} + +void BinarySweepParser::emit_current(const SweepCallback& on_sweep) { + if (!xs_.empty()) { + const int32_t ch_primary = has_cur_channel_ ? cur_channel_ : 0; + const uint32_t ch_mask = (ch_mask_ == 0 && !has_cur_channel_) ? 0U : ch_mask_; + on_sweep(xs_, ys_, ch_mask, ch_primary); + } +} + +void BinarySweepParser::feed(const uint8_t* data, size_t size, const SweepCallback& on_sweep) { + size_t i = 0; + if (odd_byte_.has_value() && size > 0) { + const uint16_t w = static_cast(*odd_byte_) | (static_cast(data[0]) << 8U); + words_.push_back(w); + odd_byte_.reset(); + i = 1; + } + + for (; i + 1 < size; i += 2) { + const uint16_t w = static_cast(data[i]) | (static_cast(data[i + 1]) << 8U); + words_.push_back(w); + } + if (i < size) { + odd_byte_ = data[i]; + } + + while (words_.size() >= 4) { + const uint16_t w0 = words_[0]; + const uint16_t w1 = words_[1]; + const uint16_t w2 = words_[2]; + const uint16_t w3 = words_[3]; + + if (w0 == 0xFFFFU && w1 == 0xFFFFU && w2 == 0xFFFFU && (w3 & 0x00FFU) == 0x000AU) { + emit_current(on_sweep); + xs_.clear(); + ys_.clear(); + ch_mask_ = 0; + has_cur_channel_ = true; + cur_channel_ = static_cast((w3 >> 8U) & 0x00FFU); + ch_mask_ |= (cur_channel_ >= 0 && cur_channel_ < 32) ? (1U << static_cast(cur_channel_)) : 0U; + for (int j = 0; j < 4; ++j) { + words_.pop_front(); + } + continue; + } + + if (w0 == 0xFFFFU && w1 == 0xFFFFU && w3 == 0x0A0AU) { + emit_current(on_sweep); + xs_.clear(); + ys_.clear(); + ch_mask_ = 0; + has_cur_channel_ = true; + cur_channel_ = static_cast(w2); + ch_mask_ |= (cur_channel_ >= 0 && cur_channel_ < 32) ? (1U << static_cast(cur_channel_)) : 0U; + for (int j = 0; j < 4; ++j) { + words_.pop_front(); + } + continue; + } + + if (w3 == 0x000AU) { + if (has_cur_channel_ && cur_channel_ >= 0 && cur_channel_ < 32) { + ch_mask_ |= (1U << static_cast(cur_channel_)); + } + xs_.push_back(static_cast(w0)); + const uint32_t value_u32 = (static_cast(w1) << 16U) | static_cast(w2); + ys_.push_back(u32_to_i32(value_u32)); + for (int j = 0; j < 4; ++j) { + words_.pop_front(); + } + continue; + } + + words_.pop_front(); + } +} + +void BinarySweepParser::flush(const SweepCallback& on_sweep) { + emit_current(on_sweep); + xs_.clear(); + ys_.clear(); + ch_mask_ = 0; + has_cur_channel_ = false; + cur_channel_ = 0; +} + +} // namespace adc_sweep diff --git a/tests/test_sweep_core.cpp b/tests/test_sweep_core.cpp new file mode 100644 index 0000000..34f1c82 --- /dev/null +++ b/tests/test_sweep_core.cpp @@ -0,0 +1,237 @@ +#include "adc_sweep/shm_stack.hpp" +#include "adc_sweep/sweep_core.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using adc_sweep::BinarySweepParser; +using adc_sweep::ShmHeader; +using adc_sweep::ShmSweepStackWriter; +using adc_sweep::SweepFinalizer; + +namespace { + +int g_fail = 0; + +void expect(bool cond, const std::string& msg) { + if (!cond) { + ++g_fail; + std::cerr << "[FAIL] " << msg << "\n"; + } +} + +void test_u32_to_i32() { + expect(adc_sweep::u32_to_i32(0x00000000U) == 0, "u32_to_i32 zero"); + expect(adc_sweep::u32_to_i32(0x7FFFFFFFU) == 2147483647, "u32_to_i32 max pos"); + expect(adc_sweep::u32_to_i32(0x80000000U) == static_cast(0x80000000U), "u32_to_i32 min neg"); + expect(adc_sweep::u32_to_i32(0xFFFFFFFFU) == -1, "u32_to_i32 -1"); +} + +void append_word(std::vector& out, uint16_t w) { + out.push_back(static_cast(w & 0xFFU)); + out.push_back(static_cast((w >> 8U) & 0xFFU)); +} + +void test_parser_modes_and_resync() { + BinarySweepParser p; + int emitted = 0; + + std::vector bytes; + append_word(bytes, 0x1234); // garbage + append_word(bytes, 0x5678); // garbage + append_word(bytes, 0xFFFF); + append_word(bytes, 0xFFFF); + append_word(bytes, 0xFFFF); + append_word(bytes, static_cast((2U << 8U) | 0x0AU)); + + append_word(bytes, 10); // step + append_word(bytes, 0x0000); // hi + append_word(bytes, 0x0001); // lo => 1 + append_word(bytes, 0x000A); + + append_word(bytes, 11); + append_word(bytes, 0xFFFF); + append_word(bytes, 0xFFFE); // -2 + append_word(bytes, 0x000A); + + // legacy start emits previous sweep + append_word(bytes, 0xFFFF); + append_word(bytes, 0xFFFF); + append_word(bytes, 3); + append_word(bytes, 0x0A0A); + + p.feed(bytes.data(), bytes.size(), [&](const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + ++emitted; + expect(xs.size() == 2, "parser emitted 2 points"); + expect(ys.size() == 2, "parser emitted 2 values"); + expect(xs[0] == 10 && ys[0] == 1, "parser point0"); + expect(xs[1] == 11 && ys[1] == -2, "parser point1"); + expect(ch_primary == 2, "parser primary ch"); + expect((ch_mask & (1U << 2U)) != 0, "parser ch mask"); + }); + + p.flush([&](const std::vector& xs, + const std::vector&, + uint32_t, + int32_t ch_primary) { + ++emitted; + expect(xs.empty(), "legacy sweep after start has no points in this stream"); + expect(ch_primary == 3, "legacy primary ch"); + }); + + // flush doesn't emit empty sweep by implementation, so emitted should be 1 + expect(emitted == 1, "parser emitted exactly one completed sweep"); +} + +void test_finalize_stats_and_fill() { + SweepFinalizer f(8, 10.0F, false); + std::vector xs{0, 2, 4}; + std::vector ys{1, 3, 5}; + auto out = f.finalize(xs, ys, (1U << 2U), 2); + expect(out.has_value(), "finalize returns sweep"); + expect(out->sweep.size() == 5, "width=max_x+1 when no fancy"); + expect(std::isnan(out->sweep[1]), "gap is NaN without fancy"); + expect(out->meta.ch_primary == 2, "meta ch primary"); + + SweepFinalizer ff(8, 10.0F, true); + auto out2 = ff.finalize(xs, ys, (1U << 2U), 2); + expect(out2.has_value(), "fancy finalize returns sweep"); + expect(!std::isnan(out2->sweep[1]), "gap filled in fancy"); + + SweepFinalizer fi(8, 10.0F, false); + std::vector x2{0, 1}; + std::vector y2{1, 2}; + auto out3 = fi.finalize(x2, y2, 0, 0); + expect(out3.has_value(), "invert finalize"); + expect(out3->sweep[0] < 0 && out3->sweep[1] < 0, "inversion applied when mean < threshold"); +} + +void test_replay_acm9_integration() { + int fd = -1; + const char* candidates[] = {"acm_9", "../acm_9", "../../acm_9"}; + for (const char* p : candidates) { + fd = ::open(p, O_RDONLY); + if (fd >= 0) { + break; + } + } + expect(fd >= 0, "acm_9 exists for integration test"); + if (fd < 0) { + return; + } + std::vector data(1 << 16); + BinarySweepParser p; + SweepFinalizer f(1000, 10.0F, false); + int n_sweeps = 0; + + while (true) { + const ssize_t n = ::read(fd, data.data(), data.size()); + if (n <= 0) { + break; + } + p.feed(data.data(), static_cast(n), [&](const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + auto out = f.finalize(xs, ys, ch_mask, ch_primary); + if (out.has_value()) { + ++n_sweeps; + expect(!out->sweep.empty(), "replay sweep non-empty"); + } + }); + } + p.flush([&](const std::vector& xs, + const std::vector& ys, + uint32_t ch_mask, + int32_t ch_primary) { + auto out = f.finalize(xs, ys, ch_mask, ch_primary); + if (out.has_value()) { + ++n_sweeps; + } + }); + ::close(fd); + + expect(n_sweeps > 0, "replay acm_9 produced sweeps"); +} + +void test_shm_publish_and_read() { + const std::string shm_name = "/adc_sweep_test_stack"; + { + ShmSweepStackWriter w(shm_name, 16, 32); + std::string err; + if (!w.open_or_create(err)) { + if (err.find("Permission denied") != std::string::npos) { + std::cerr << "[SKIP] shm_open is not permitted in this environment\n"; + return; + } + expect(false, "shm open/create"); + return; + } + + for (int i = 0; i < 128; ++i) { + adc_sweep::SweepResult s; + s.sweep.assign(32, static_cast(i)); + s.meta.sweep_idx = static_cast(i + 1); + s.meta.ch_primary = 2; + s.meta.ch_mask = (1U << 2U); + s.meta.n_valid = 32.0F; + s.meta.min = static_cast(i); + s.meta.max = static_cast(i); + s.meta.mean = static_cast(i); + s.meta.std = 0.0F; + s.meta.dt_ms = 1.0F; + s.meta.ts_mono_ns = static_cast(i); + expect(w.publish(s, err), "shm publish"); + } + } + + const int fd = ::shm_open(shm_name.c_str(), O_RDONLY, 0); + expect(fd >= 0, "shm open readonly"); + if (fd < 0) { + return; + } + const size_t map_size = sizeof(ShmHeader) + 16 * ((sizeof(adc_sweep::SweepSlotHeader) + 32 * sizeof(float) + 63) & ~63U); + void* mapped = ::mmap(nullptr, map_size, PROT_READ, MAP_SHARED, fd, 0); + expect(mapped != MAP_FAILED, "mmap readonly"); + if (mapped != MAP_FAILED) { + auto snap = adc_sweep::read_latest_snapshot(mapped, 16, 32, + static_cast((sizeof(adc_sweep::SweepSlotHeader) + 32 * sizeof(float) + 63) & ~63U)); + expect(snap.has_value(), "snapshot exists"); + if (snap.has_value()) { + expect(snap->seq >= 128, "snapshot seq latest"); + expect(!snap->sweep.empty(), "snapshot sweep present"); + } + ::munmap(mapped, map_size); + } + ::close(fd); + ::shm_unlink(shm_name.c_str()); +} + +} // namespace + +int main() { + test_u32_to_i32(); + test_parser_modes_and_resync(); + test_finalize_stats_and_fill(); + test_replay_acm9_integration(); + test_shm_publish_and_read(); + + if (g_fail == 0) { + std::cout << "All tests passed\n"; + return 0; + } + std::cerr << g_fail << " test(s) failed\n"; + return 1; +}