Files
vizionStreamer/GStreamerPipeline.cpp
2025-12-19 09:49:11 +01:00

223 lines
8.1 KiB
C++

/*
* VizionStreamer - GStreamer Pipeline Implementation
* Copyright (c) 2025 Maik Jurischka
*
* Licensed under CC BY-NC-SA 4.0
* https://creativecommons.org/licenses/by-nc-sa/4.0/
*/
#include "GStreamerPipeline.h"
#include <iostream>
#include <cstring>
#include <utility>
GStreamerPipeline::GStreamerPipeline(std::string pipelineDescription)
: pipeline_(nullptr), appsrc_(nullptr), bus_(nullptr), running_(false),
pipelineDescription_(std::move(pipelineDescription)), width_(0), height_(0) {
std::cout << "[DEBUG] Initializing GStreamer..." << std::endl;
gst_init(nullptr, nullptr);
std::cout << "[DEBUG] GStreamer initialized successfully" << std::endl;
if (!pipelineDescription_.empty()) {
std::cout << "[DEBUG] Pipeline description: " << pipelineDescription_ << std::endl;
}
}
GStreamerPipeline::~GStreamerPipeline() {
std::cout << "[DEBUG] Destroying GStreamerPipeline..." << std::endl;
stop();
std::cout << "[DEBUG] GStreamerPipeline destroyed" << std::endl;
}
void GStreamerPipeline::setPipelineDescription(const std::string& description) {
if (!running_) {
std::cout << "[DEBUG] Setting pipeline description: " << description << std::endl;
pipelineDescription_ = description;
} else {
std::cerr << "[DEBUG] Cannot set pipeline description while running" << std::endl;
}
}
bool GStreamerPipeline::start() {
std::cout << "[DEBUG] Starting GStreamer pipeline..." << std::endl;
if (running_) {
std::cerr << "[ERROR] GStreamer pipeline already running" << std::endl;
return false;
}
if (pipelineDescription_.empty()) {
std::cerr << "[ERROR] Pipeline description is empty" << std::endl;
return false;
}
GError* error = nullptr;
const std::string fullPipeline = "appsrc name=source ! " + pipelineDescription_;
std::cout << "[DEBUG] Full pipeline string: " << fullPipeline << std::endl;
std::cout << "[DEBUG] Parsing pipeline..." << std::endl;
pipeline_ = gst_parse_launch(fullPipeline.c_str(), &error);
if (error) {
std::cerr << "[ERROR] Failed to create pipeline: " << error->message << std::endl;
g_error_free(error);
return false;
}
std::cout << "[DEBUG] Pipeline parsed successfully" << std::endl;
std::cout << "[DEBUG] Getting appsrc element..." << std::endl;
appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "source");
if (!appsrc_) {
std::cerr << "[ERROR] Failed to get appsrc element" << std::endl;
gst_object_unref(pipeline_);
return false;
}
std::cout << "[DEBUG] Appsrc element retrieved successfully" << std::endl;
// Configure appsrc
std::cout << "[DEBUG] Configuring appsrc properties..." << std::endl;
g_object_set(G_OBJECT(appsrc_),
"stream-type", GST_APP_STREAM_TYPE_STREAM,
"format", GST_FORMAT_TIME,
"is-live", TRUE,
nullptr);
// Set callbacks
std::cout << "[DEBUG] Setting appsrc callbacks..." << std::endl;
GstAppSrcCallbacks callbacks = {};
callbacks.need_data = onNeedData;
callbacks.enough_data = onEnoughData;
callbacks.seek_data = nullptr;
gst_app_src_set_callbacks(GST_APP_SRC(appsrc_), &callbacks, this, nullptr);
// Start the pipeline
std::cout << "[DEBUG] Setting pipeline state to PLAYING..." << std::endl;
const GstStateChangeReturn ret = gst_element_set_state(pipeline_, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
std::cerr << "[ERROR] Failed to set pipeline state to PLAYING" << std::endl;
gst_object_unref(appsrc_);
gst_object_unref(pipeline_);
return false;
}
std::cout << "[DEBUG] Pipeline state change return: ";
switch(ret) {
case GST_STATE_CHANGE_SUCCESS:
std::cout << "SUCCESS" << std::endl;
break;
case GST_STATE_CHANGE_ASYNC:
std::cout << "ASYNC" << std::endl;
break;
case GST_STATE_CHANGE_NO_PREROLL:
std::cout << "NO_PREROLL" << std::endl;
break;
default:
std::cout << "UNKNOWN" << std::endl;
}
bus_ = gst_element_get_bus(pipeline_);
running_ = true;
std::cout << "[SUCCESS] GStreamer pipeline started successfully" << std::endl;
std::cout << "[INFO] Full pipeline: " << fullPipeline << std::endl;
return true;
}
void GStreamerPipeline::stop() {
if (!running_) {
std::cout << "[DEBUG] Pipeline already stopped, nothing to do" << std::endl;
return;
}
std::cout << "[DEBUG] Stopping GStreamer pipeline..." << std::endl;
running_ = false;
if (appsrc_) {
std::cout << "[DEBUG] Sending EOS to appsrc..." << std::endl;
gst_app_src_end_of_stream(GST_APP_SRC(appsrc_));
}
if (pipeline_) {
std::cout << "[DEBUG] Setting pipeline state to NULL..." << std::endl;
gst_element_set_state(pipeline_, GST_STATE_NULL);
std::cout << "[DEBUG] Unreferencing pipeline..." << std::endl;
gst_object_unref(pipeline_);
pipeline_ = nullptr;
}
if (appsrc_) {
std::cout << "[DEBUG] Unreferencing appsrc..." << std::endl;
gst_object_unref(appsrc_);
appsrc_ = nullptr;
}
if (bus_) {
std::cout << "[DEBUG] Unreferencing bus..." << std::endl;
gst_object_unref(bus_);
bus_ = nullptr;
}
std::cout << "[SUCCESS] GStreamer pipeline stopped" << std::endl;
}
bool GStreamerPipeline::pushBuffer(const uint8_t* data, const size_t size, const int width, const int height, const std::string& format) {
if (!running_ || !appsrc_) {
std::cerr << "[DEBUG] Cannot push buffer: pipeline not running or appsrc is null" << std::endl;
return false;
}
// Update format if changed
if (width != width_ || height != height_ || format != format_) {
std::cout << "[DEBUG] Stream format changed - updating caps" << std::endl;
std::cout << "[DEBUG] Old: " << width_ << "x" << height_ << " (" << format_ << ")" << std::endl;
std::cout << "[DEBUG] New: " << width << "x" << height << " (" << format << ")" << std::endl;
width_ = width;
height_ = height;
format_ = format;
// Set caps based on format
std::string capsStr;
if (format == "YUY2" || format == "UYVY" || format == "BGR" || format == "RGB") {
capsStr = "video/x-raw,format=" + format + ",width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1";
} else if (format == "MJPG") {
capsStr = "image/jpeg,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1";
} else {
capsStr = "video/x-raw,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1";
}
std::cout << "[DEBUG] Setting caps: " << capsStr << std::endl;
GstCaps* caps = gst_caps_from_string(capsStr.c_str());
gst_app_src_set_caps(GST_APP_SRC(appsrc_), caps);
gst_caps_unref(caps);
std::cout << "[DEBUG] Caps set successfully" << std::endl;
}
// Create buffer and copy data
GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
memcpy(map.data, data, size);
gst_buffer_unmap(buffer, &map);
// Push buffer to pipeline
const GstFlowReturn ret = gst_app_src_push_buffer(GST_APP_SRC(appsrc_), buffer);
if (ret != GST_FLOW_OK) {
std::cerr << "[ERROR] Failed to push buffer to pipeline, flow return: " << ret << std::endl;
return false;
}
return true;
}
void GStreamerPipeline::onNeedData(GstAppSrc* appsrc, guint unused, gpointer user_data) {
// Called when pipeline needs more data
//std::cout << "[DEBUG] Pipeline callback: need_data - pipeline ready for more data" << std::endl;
}
void GStreamerPipeline::onEnoughData(GstAppSrc* appsrc, gpointer user_data) {
// Called when pipeline has enough data buffered
// std::cout << "[DEBUG] Pipeline callback: enough_data - pipeline buffer full" << std::endl;
}