add SharedMemoryWriter support

This commit is contained in:
Maik Jurischka
2026-01-29 09:51:45 +01:00
parent 2b769bfe5e
commit 72f6bbe8df
7 changed files with 339 additions and 1 deletions

View File

@@ -44,6 +44,7 @@ add_executable(vizionStreamer
src/CameraController.cpp
src/GStreamerPipeline.cpp
src/StreamingEngine.cpp
src/SharedMemoryWriter.cpp
)
# Link libraries
@@ -51,6 +52,7 @@ target_link_libraries(vizionStreamer PRIVATE
${VIZIONSDK_LIBRARY}
${GSTREAMER_LIBRARIES}
${GSTREAMER_APP_LIBRARIES}
rt
)
# Set RPATH so the executable can find the SDK .so at runtime without needing LD_LIBRARY_PATH

View File

@@ -47,6 +47,9 @@ private:
std::string handleSetEHDRRatioMin(const std::string& value);
std::string handleSetEHDRRatioMax(const std::string& value);
std::string handleGetEHDRStatus();
std::string handleEnableSharedMemory(const std::string& name, const std::string& buffer_size);
std::string handleDisableSharedMemory();
std::string handleGetSharedMemoryStatus();
// Helper functions
static VX_IMAGE_FORMAT stringToFormat(const std::string& format);

View File

@@ -0,0 +1,62 @@
/*
* VizionStreamer - Shared Memory Writer Interface
* Copyright (c) 2025 Maik Jurischka
*
* Licensed under CC BY-NC-SA 4.0
* https://creativecommons.org/licenses/by-nc-sa/4.0/
*/
#pragma once
#include <string>
#include <cstdint>
#include <atomic>
// Shared memory header structure for lock-free synchronization
struct SharedMemoryHeader {
uint32_t magic; // Magic number for validation (0x56495A4E = "VIZN")
uint32_t width; // Frame width in pixels
uint32_t height; // Frame height in pixels
uint32_t format; // Format enum (matches VX_IMAGE_FORMAT)
uint32_t data_size; // Actual frame data size in bytes
uint64_t timestamp_ns; // Timestamp in nanoseconds
uint32_t frame_sequence; // Monotonic frame counter
std::atomic<uint32_t> write_sequence; // Lock-free synchronization counter
char format_str[16]; // Format string ("YUY2", "MJPG", etc.)
uint8_t reserved[72]; // Reserved for future expansion (padding to 128 bytes)
};
static_assert(sizeof(SharedMemoryHeader) == 128, "SharedMemoryHeader must be exactly 128 bytes");
class SharedMemoryWriter {
public:
explicit SharedMemoryWriter(std::string name);
~SharedMemoryWriter();
// Initialize shared memory region with given buffer size
bool create(size_t frame_buffer_size);
// Write frame to shared memory with lock-free protocol
bool writeFrame(const uint8_t* data, size_t size,
int width, int height,
const std::string& format,
uint64_t timestamp_ns = 0);
// Cleanup shared memory
void destroy();
// Status queries
bool isCreated() const { return shm_fd_ >= 0; }
std::string getName() const { return shm_name_; }
size_t getBufferSize() const { return buffer_size_; }
uint32_t getFrameCount() const { return frame_counter_; }
private:
SharedMemoryHeader* getHeader();
std::string shm_name_; // Shared memory object name
int shm_fd_; // File descriptor from shm_open
void* shm_ptr_; // Mapped memory pointer
size_t buffer_size_; // Total size (header + frame data)
uint32_t frame_counter_; // Frame sequence counter
};

View File

@@ -10,6 +10,7 @@
#include <vizionsdk/VizionSDK.h>
#include "vizionstreamer/GStreamerPipeline.h"
#include "vizionstreamer/SharedMemoryWriter.h"
#include <memory>
#include <thread>
#include <atomic>
@@ -29,11 +30,19 @@ public:
void setPipelineDescription(const std::string& pipeline);
// Shared memory output control
bool enableSharedMemory(const std::string& name, size_t buffer_size);
void disableSharedMemory();
[[nodiscard]] bool isSharedMemoryEnabled() const { return shmWriter_ != nullptr && shmWriter_->isCreated(); }
[[nodiscard]] std::string getSharedMemoryName() const { return shmWriter_ ? shmWriter_->getName() : ""; }
[[nodiscard]] size_t getSharedMemorySize() const { return shmWriter_ ? shmWriter_->getBufferSize() : 0; }
private:
void acquisitionLoop();
std::shared_ptr<VxCamera> camera_;
std::unique_ptr<GStreamerPipeline> gstPipeline_;
std::unique_ptr<SharedMemoryWriter> shmWriter_;
std::unique_ptr<std::thread> acquisitionThread_;
std::atomic<bool> running_;
std::mutex mutex_;

View File

@@ -94,6 +94,12 @@ std::string CameraController::processCommand(const std::string& jsonCommand) {
return handleSetEHDRRatioMax(getParam("value"));
} else if (command == "get_ehdr_status") {
return handleGetEHDRStatus();
} else if (command == "enable_shared_memory") {
return handleEnableSharedMemory(getParam("name"), getParam("buffer_size"));
} else if (command == "disable_shared_memory") {
return handleDisableSharedMemory();
} else if (command == "get_shared_memory_status") {
return handleGetSharedMemoryStatus();
} else {
return createErrorResponse("Unknown command: " + command);
}
@@ -410,3 +416,52 @@ std::string CameraController::createSuccessResponse(const std::string& message)
}
return R"({"status":"success","message":")" + message + "\"}";
}
std::string CameraController::handleEnableSharedMemory(const std::string& name, const std::string& buffer_size) {
if (streamingEngine_->isRunning()) {
return createErrorResponse("Cannot enable shared memory while streaming is active");
}
try {
// Use default name if not provided
std::string shm_name = name.empty() ? "/vizion_frame" : name;
// Use default buffer size if not provided (1080p uncompressed + header)
size_t buf_size;
if (buffer_size.empty()) {
buf_size = 1920 * 1080 * 4 + 128; // Default: 1080p RGBA + header
} else {
buf_size = std::stoull(buffer_size);
}
if (!streamingEngine_->enableSharedMemory(shm_name, buf_size)) {
return createErrorResponse("Failed to create shared memory");
}
std::ostringstream oss;
oss << R"({"status":"success","message":"Shared memory enabled","name":")"
<< shm_name << R"(","size":)" << buf_size << "}";
return oss.str();
} catch (const std::exception& e) {
return createErrorResponse(std::string("Invalid parameters: ") + e.what());
}
}
std::string CameraController::handleDisableSharedMemory() {
streamingEngine_->disableSharedMemory();
return createSuccessResponse("Shared memory disabled");
}
std::string CameraController::handleGetSharedMemoryStatus() {
std::ostringstream oss;
oss << R"({"status":"success","shared_memory_enabled":)"
<< (streamingEngine_->isSharedMemoryEnabled() ? "true" : "false");
if (streamingEngine_->isSharedMemoryEnabled()) {
oss << R"(,"name":")" << streamingEngine_->getSharedMemoryName() << "\""
<< R"(,"size":)" << streamingEngine_->getSharedMemorySize();
}
oss << "}";
return oss.str();
}

158
src/SharedMemoryWriter.cpp Normal file
View File

@@ -0,0 +1,158 @@
/*
* VizionStreamer - Shared Memory Writer Implementation
* Copyright (c) 2025 Maik Jurischka
*
* Licensed under CC BY-NC-SA 4.0
* https://creativecommons.org/licenses/by-nc-sa/4.0/
*/
#include "vizionstreamer/SharedMemoryWriter.h"
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <chrono>
#include <cerrno>
SharedMemoryWriter::SharedMemoryWriter(std::string name)
: shm_name_(std::move(name))
, shm_fd_(-1)
, shm_ptr_(nullptr)
, buffer_size_(0)
, frame_counter_(0) {
}
SharedMemoryWriter::~SharedMemoryWriter() {
destroy();
}
bool SharedMemoryWriter::create(const size_t frame_buffer_size) {
if (shm_fd_ >= 0) {
std::cerr << "Shared memory already created" << std::endl;
return false;
}
// Calculate total size (header + frame buffer)
buffer_size_ = sizeof(SharedMemoryHeader) + frame_buffer_size;
// Create shared memory object
shm_fd_ = shm_open(shm_name_.c_str(), O_CREAT | O_RDWR, 0666);
if (shm_fd_ < 0) {
std::cerr << "Failed to create shared memory '" << shm_name_
<< "': " << strerror(errno) << std::endl;
return false;
}
// Set size
if (ftruncate(shm_fd_, static_cast<off_t>(buffer_size_)) != 0) {
std::cerr << "Failed to set shared memory size: " << strerror(errno) << std::endl;
close(shm_fd_);
shm_fd_ = -1;
shm_unlink(shm_name_.c_str());
return false;
}
// Map memory
shm_ptr_ = mmap(nullptr, buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd_, 0);
if (shm_ptr_ == MAP_FAILED) {
std::cerr << "Failed to map shared memory: " << strerror(errno) << std::endl;
close(shm_fd_);
shm_fd_ = -1;
shm_unlink(shm_name_.c_str());
shm_ptr_ = nullptr;
return false;
}
// Initialize header
SharedMemoryHeader* header = getHeader();
header->magic = 0x56495A4E; // "VIZN"
header->width = 0;
header->height = 0;
header->format = 0;
header->data_size = 0;
header->timestamp_ns = 0;
header->frame_sequence = 0;
header->write_sequence.store(0, std::memory_order_release);
memset(header->format_str, 0, sizeof(header->format_str));
memset(header->reserved, 0, sizeof(header->reserved));
std::cout << "Shared memory created: " << shm_name_
<< " (" << buffer_size_ << " bytes)" << std::endl;
return true;
}
bool SharedMemoryWriter::writeFrame(const uint8_t* data, const size_t size,
const int width, const int height,
const std::string& format,
const uint64_t timestamp_ns) {
if (!shm_ptr_ || shm_fd_ < 0) {
return false;
}
// Check size
if (size + sizeof(SharedMemoryHeader) > buffer_size_) {
std::cerr << "Frame too large for shared memory buffer (frame: "
<< size << " bytes, available: "
<< (buffer_size_ - sizeof(SharedMemoryHeader)) << " bytes)" << std::endl;
return false;
}
SharedMemoryHeader* header = getHeader();
auto* frame_data = static_cast<uint8_t*>(shm_ptr_) + sizeof(SharedMemoryHeader);
// Lock-free write protocol:
// 1. Increment sequence (odd = writing in progress)
header->write_sequence.fetch_add(1, std::memory_order_acquire);
// 2. Update header metadata
header->width = width;
header->height = height;
header->data_size = size;
header->frame_sequence = ++frame_counter_;
// Set timestamp (use provided or generate current)
if (timestamp_ns > 0) {
header->timestamp_ns = timestamp_ns;
} else {
header->timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
// Copy format string
strncpy(header->format_str, format.c_str(), sizeof(header->format_str) - 1);
header->format_str[sizeof(header->format_str) - 1] = '\0';
// 3. Copy frame data
memcpy(frame_data, data, size);
// 4. Increment sequence (even = write complete)
header->write_sequence.fetch_add(1, std::memory_order_release);
return true;
}
void SharedMemoryWriter::destroy() {
// Unmap memory
if (shm_ptr_ != nullptr && shm_ptr_ != MAP_FAILED) {
munmap(shm_ptr_, buffer_size_);
shm_ptr_ = nullptr;
}
// Close and unlink shared memory
if (shm_fd_ >= 0) {
close(shm_fd_);
shm_unlink(shm_name_.c_str());
std::cout << "Shared memory destroyed: " << shm_name_ << std::endl;
shm_fd_ = -1;
}
buffer_size_ = 0;
frame_counter_ = 0;
}
SharedMemoryHeader* SharedMemoryWriter::getHeader() {
return static_cast<SharedMemoryHeader*>(shm_ptr_);
}

View File

@@ -116,7 +116,11 @@ void StreamingEngine::acquisitionLoop() {
const VX_CAPTURE_RESULT result = VxGetImage(camera_, buffer_.get(), &dataSize, 1000);
if (result == VX_CAPTURE_RESULT::VX_SUCCESS && dataSize > 0) {
// Push frame to GStreamer pipeline
// Get timestamp for frame
const uint64_t timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
// Determine format string
std::string formatStr;
switch (currentFormat_.format) {
case VX_IMAGE_FORMAT::YUY2: formatStr = "YUY2"; break;
@@ -127,12 +131,22 @@ void StreamingEngine::acquisitionLoop() {
default: formatStr = "UNKNOWN"; break;
}
// Push frame to GStreamer pipeline
if (!gstPipeline_->pushBuffer(buffer_.get(), dataSize,
currentFormat_.width, currentFormat_.height,
formatStr)) {
std::cerr << "Failed to push frame to GStreamer pipeline" << std::endl;
}
// Push frame to shared memory if enabled
if (shmWriter_ && shmWriter_->isCreated()) {
if (!shmWriter_->writeFrame(buffer_.get(), dataSize,
currentFormat_.width, currentFormat_.height,
formatStr, timestamp_ns)) {
std::cerr << "Failed to write frame to shared memory" << std::endl;
}
}
frameCount++;
framesInLastSecond++;
@@ -158,3 +172,38 @@ void StreamingEngine::acquisitionLoop() {
std::cout << "Acquisition loop stopped. Total frames captured: " << frameCount << std::endl;
}
bool StreamingEngine::enableSharedMemory(const std::string& name, const size_t buffer_size) {
std::lock_guard<std::mutex> lock(mutex_);
if (running_) {
std::cerr << "Cannot enable shared memory while streaming is active" << std::endl;
return false;
}
if (shmWriter_ && shmWriter_->isCreated()) {
std::cerr << "Shared memory already enabled" << std::endl;
return false;
}
// Create shared memory writer
shmWriter_ = std::make_unique<SharedMemoryWriter>(name);
if (!shmWriter_->create(buffer_size)) {
std::cerr << "Failed to create shared memory" << std::endl;
shmWriter_.reset();
return false;
}
std::cout << "Shared memory enabled: " << name << " (" << buffer_size << " bytes)" << std::endl;
return true;
}
void StreamingEngine::disableSharedMemory() {
std::lock_guard<std::mutex> lock(mutex_);
if (shmWriter_) {
shmWriter_->destroy();
shmWriter_.reset();
std::cout << "Shared memory disabled" << std::endl;
}
}