/* * VizionStreamer - Streaming Engine Implementation * Copyright (c) 2025 Maik Jurischka * * Licensed under CC BY-NC-SA 4.0 * https://creativecommons.org/licenses/by-nc-sa/4.0/ */ #include "StreamingEngine.h" #include #include #include StreamingEngine::StreamingEngine(std::shared_ptr camera) : camera_(std::move(camera)), running_(false), currentFormat_(), bufferSize_(0) { gstPipeline_ = std::make_unique(""); } StreamingEngine::~StreamingEngine() { stop(); } void StreamingEngine::setPipelineDescription(const std::string& pipeline) { std::lock_guard lock(mutex_); if (!running_) { gstPipeline_->setPipelineDescription(pipeline); } } bool StreamingEngine::start(const std::string& gstPipeline) { std::lock_guard lock(mutex_); if (running_) { std::cerr << "Streaming engine already running" << std::endl; return false; } // Set pipeline description gstPipeline_->setPipelineDescription(gstPipeline); // Start camera streaming if (VxStartStreaming(camera_) != 0) { std::cerr << "Failed to start camera streaming" << std::endl; return false; } // Get current format to allocate buffer std::vector fmtList; if (VxGetFormatList(camera_, fmtList) != 0 || fmtList.empty()) { std::cerr << "Failed to get format list" << std::endl; VxStopStreaming(camera_); return false; } currentFormat_ = fmtList[0]; // Allocate buffer (assume worst case: uncompressed) const size_t calculatedBufferSize = currentFormat_.width * currentFormat_.height * 4; bufferSize_ = calculatedBufferSize; buffer_ = std::make_unique(bufferSize_); // Start GStreamer pipeline if (!gstPipeline_->start()) { std::cerr << "Failed to start GStreamer pipeline" << std::endl; VxStopStreaming(camera_); return false; } // Start acquisition thread running_ = true; acquisitionThread_ = std::make_unique(&StreamingEngine::acquisitionLoop, this); std::cout << "Streaming engine started" << std::endl; return true; } void StreamingEngine::stop() { if (!running_) { return; } running_ = false; // Wait for acquisition thread to finish if (acquisitionThread_ && acquisitionThread_->joinable()) { acquisitionThread_->join(); } // Stop GStreamer pipeline gstPipeline_->stop(); // Stop camera streaming VxStopStreaming(camera_); std::cout << "Streaming engine stopped" << std::endl; } void StreamingEngine::setFormat(const VxFormat& format) { std::lock_guard lock(mutex_); if (!running_) { currentFormat_ = format; if (VxSetFormat(camera_, format) != 0) { std::cerr << "Failed to set format" << std::endl; } } } void StreamingEngine::acquisitionLoop() { uint64_t frameCount = 0; auto lastStatsTime = std::chrono::steady_clock::now(); uint64_t framesInLastSecond = 0; std::cout << "Acquisition loop started" << std::endl; while (running_) { int dataSize = 0; const VX_CAPTURE_RESULT result = VxGetImage(camera_, buffer_.get(), &dataSize, 1000); if (result == VX_CAPTURE_RESULT::VX_SUCCESS && dataSize > 0) { // Push frame to GStreamer pipeline std::string formatStr; switch (currentFormat_.format) { case VX_IMAGE_FORMAT::YUY2: formatStr = "YUY2"; break; case VX_IMAGE_FORMAT::UYVY: formatStr = "UYVY"; break; case VX_IMAGE_FORMAT::MJPG: formatStr = "MJPG"; break; case VX_IMAGE_FORMAT::BGR: formatStr = "BGR"; break; case VX_IMAGE_FORMAT::RGB: formatStr = "RGB"; break; default: formatStr = "UNKNOWN"; break; } if (!gstPipeline_->pushBuffer(buffer_.get(), dataSize, currentFormat_.width, currentFormat_.height, formatStr)) { std::cerr << "Failed to push frame to GStreamer pipeline" << std::endl; } frameCount++; framesInLastSecond++; // Print statistics every second const auto now = std::chrono::steady_clock::now(); const auto elapsed = std::chrono::duration_cast(now - lastStatsTime); if (elapsed.count() >= 1) { std::cout << "FPS: " << framesInLastSecond << " | Total frames: " << frameCount << " | Frame size: " << dataSize << " bytes" << std::endl; framesInLastSecond = 0; lastStatsTime = now; } } else if (result == VX_CAPTURE_RESULT::VX_TIMEOUT) { // Timeout is normal, just continue continue; } else { std::cerr << "Failed to capture frame: " << static_cast(result) << std::endl; // Don't break on error, just continue trying std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } std::cout << "Acquisition loop stopped. Total frames captured: " << frameCount << std::endl; }