#include "GStreamerPipeline.h" #include #include #include GStreamerPipeline::GStreamerPipeline(std::string pipelineDescription) : pipeline_(nullptr), appsrc_(nullptr), bus_(nullptr), running_(false), pipelineDescription_(std::move(pipelineDescription)), width_(0), height_(0) { std::cout << "[DEBUG] Initializing GStreamer..." << std::endl; gst_init(nullptr, nullptr); std::cout << "[DEBUG] GStreamer initialized successfully" << std::endl; if (!pipelineDescription_.empty()) { std::cout << "[DEBUG] Pipeline description: " << pipelineDescription_ << std::endl; } } GStreamerPipeline::~GStreamerPipeline() { std::cout << "[DEBUG] Destroying GStreamerPipeline..." << std::endl; stop(); std::cout << "[DEBUG] GStreamerPipeline destroyed" << std::endl; } void GStreamerPipeline::setPipelineDescription(const std::string& description) { if (!running_) { std::cout << "[DEBUG] Setting pipeline description: " << description << std::endl; pipelineDescription_ = description; } else { std::cerr << "[DEBUG] Cannot set pipeline description while running" << std::endl; } } bool GStreamerPipeline::start() { std::cout << "[DEBUG] Starting GStreamer pipeline..." << std::endl; if (running_) { std::cerr << "[ERROR] GStreamer pipeline already running" << std::endl; return false; } if (pipelineDescription_.empty()) { std::cerr << "[ERROR] Pipeline description is empty" << std::endl; return false; } GError* error = nullptr; const std::string fullPipeline = "appsrc name=source ! " + pipelineDescription_; std::cout << "[DEBUG] Full pipeline string: " << fullPipeline << std::endl; std::cout << "[DEBUG] Parsing pipeline..." << std::endl; pipeline_ = gst_parse_launch(fullPipeline.c_str(), &error); if (error) { std::cerr << "[ERROR] Failed to create pipeline: " << error->message << std::endl; g_error_free(error); return false; } std::cout << "[DEBUG] Pipeline parsed successfully" << std::endl; std::cout << "[DEBUG] Getting appsrc element..." << std::endl; appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "source"); if (!appsrc_) { std::cerr << "[ERROR] Failed to get appsrc element" << std::endl; gst_object_unref(pipeline_); return false; } std::cout << "[DEBUG] Appsrc element retrieved successfully" << std::endl; // Configure appsrc std::cout << "[DEBUG] Configuring appsrc properties..." << std::endl; g_object_set(G_OBJECT(appsrc_), "stream-type", GST_APP_STREAM_TYPE_STREAM, "format", GST_FORMAT_TIME, "is-live", TRUE, nullptr); // Set callbacks std::cout << "[DEBUG] Setting appsrc callbacks..." << std::endl; GstAppSrcCallbacks callbacks; callbacks.need_data = onNeedData; callbacks.enough_data = onEnoughData; callbacks.seek_data = nullptr; gst_app_src_set_callbacks(GST_APP_SRC(appsrc_), &callbacks, this, nullptr); // Start the pipeline std::cout << "[DEBUG] Setting pipeline state to PLAYING..." << std::endl; GstStateChangeReturn ret = gst_element_set_state(pipeline_, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) { std::cerr << "[ERROR] Failed to set pipeline state to PLAYING" << std::endl; gst_object_unref(appsrc_); gst_object_unref(pipeline_); return false; } std::cout << "[DEBUG] Pipeline state change return: "; switch(ret) { case GST_STATE_CHANGE_SUCCESS: std::cout << "SUCCESS" << std::endl; break; case GST_STATE_CHANGE_ASYNC: std::cout << "ASYNC" << std::endl; break; case GST_STATE_CHANGE_NO_PREROLL: std::cout << "NO_PREROLL" << std::endl; break; default: std::cout << "UNKNOWN" << std::endl; } bus_ = gst_element_get_bus(pipeline_); running_ = true; std::cout << "[SUCCESS] GStreamer pipeline started successfully" << std::endl; std::cout << "[INFO] Full pipeline: " << fullPipeline << std::endl; return true; } void GStreamerPipeline::stop() { if (!running_) { std::cout << "[DEBUG] Pipeline already stopped, nothing to do" << std::endl; return; } std::cout << "[DEBUG] Stopping GStreamer pipeline..." << std::endl; running_ = false; if (appsrc_) { std::cout << "[DEBUG] Sending EOS to appsrc..." << std::endl; gst_app_src_end_of_stream(GST_APP_SRC(appsrc_)); } if (pipeline_) { std::cout << "[DEBUG] Setting pipeline state to NULL..." << std::endl; gst_element_set_state(pipeline_, GST_STATE_NULL); std::cout << "[DEBUG] Unreferencing pipeline..." << std::endl; gst_object_unref(pipeline_); pipeline_ = nullptr; } if (appsrc_) { std::cout << "[DEBUG] Unreferencing appsrc..." << std::endl; gst_object_unref(appsrc_); appsrc_ = nullptr; } if (bus_) { std::cout << "[DEBUG] Unreferencing bus..." << std::endl; gst_object_unref(bus_); bus_ = nullptr; } std::cout << "[SUCCESS] GStreamer pipeline stopped" << std::endl; } bool GStreamerPipeline::pushBuffer(const uint8_t* data, const size_t size, const int width, const int height, const std::string& format) { if (!running_ || !appsrc_) { std::cerr << "[DEBUG] Cannot push buffer: pipeline not running or appsrc is null" << std::endl; return false; } // Update format if changed if (width != width_ || height != height_ || format != format_) { std::cout << "[DEBUG] Stream format changed - updating caps" << std::endl; std::cout << "[DEBUG] Old: " << width_ << "x" << height_ << " (" << format_ << ")" << std::endl; std::cout << "[DEBUG] New: " << width << "x" << height << " (" << format << ")" << std::endl; width_ = width; height_ = height; format_ = format; // Set caps based on format std::string capsStr; if (format == "YUY2" || format == "UYVY" || format == "BGR" || format == "RGB") { capsStr = "video/x-raw,format=" + format + ",width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=30/1"; } else if (format == "MJPG") { capsStr = "image/jpeg,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=30/1"; } else { capsStr = "video/x-raw,width=" + std::to_string(width) + ",height=" + std::to_string(height) + ",framerate=30/1"; } std::cout << "[DEBUG] Setting caps: " << capsStr << std::endl; GstCaps* caps = gst_caps_from_string(capsStr.c_str()); gst_app_src_set_caps(GST_APP_SRC(appsrc_), caps); gst_caps_unref(caps); std::cout << "[DEBUG] Caps set successfully" << std::endl; } // Create buffer and copy data GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr); GstMapInfo map; gst_buffer_map(buffer, &map, GST_MAP_WRITE); memcpy(map.data, data, size); gst_buffer_unmap(buffer, &map); // Push buffer to pipeline GstFlowReturn ret = gst_app_src_push_buffer(GST_APP_SRC(appsrc_), buffer); if (ret != GST_FLOW_OK) { std::cerr << "[ERROR] Failed to push buffer to pipeline, flow return: " << ret << std::endl; return false; } return true; } void GStreamerPipeline::onNeedData(GstAppSrc* appsrc, guint unused, gpointer user_data) { // Called when pipeline needs more data //std::cout << "[DEBUG] Pipeline callback: need_data - pipeline ready for more data" << std::endl; } void GStreamerPipeline::onEnoughData(GstAppSrc* appsrc, gpointer user_data) { // Called when pipeline has enough data buffered // std::cout << "[DEBUG] Pipeline callback: enough_data - pipeline buffer full" << std::endl; }