stream optimizations

This commit is contained in:
Maik Jurischka
2025-12-12 14:37:57 +01:00
parent 43a1abdd31
commit 9c9f822f35
10 changed files with 168 additions and 24 deletions

View File

@@ -1,10 +1,11 @@
#include "GStreamerPipeline.h" #include "GStreamerPipeline.h"
#include <iostream> #include <iostream>
#include <cstring> #include <cstring>
#include <utility>
GStreamerPipeline::GStreamerPipeline(const std::string& pipelineDescription) GStreamerPipeline::GStreamerPipeline(std::string pipelineDescription)
: pipeline_(nullptr), appsrc_(nullptr), bus_(nullptr), running_(false), : pipeline_(nullptr), appsrc_(nullptr), bus_(nullptr), running_(false),
pipelineDescription_(pipelineDescription), width_(0), height_(0) { pipelineDescription_(std::move(pipelineDescription)), width_(0), height_(0) {
gst_init(nullptr, nullptr); gst_init(nullptr, nullptr);
} }
@@ -30,7 +31,7 @@ bool GStreamerPipeline::start() {
} }
GError* error = nullptr; GError* error = nullptr;
std::string fullPipeline = "appsrc name=source ! " + pipelineDescription_; const std::string fullPipeline = "appsrc name=source ! " + pipelineDescription_;
pipeline_ = gst_parse_launch(fullPipeline.c_str(), &error); pipeline_ = gst_parse_launch(fullPipeline.c_str(), &error);
if (error) { if (error) {
@@ -106,7 +107,7 @@ void GStreamerPipeline::stop() {
std::cout << "GStreamer pipeline stopped" << std::endl; std::cout << "GStreamer pipeline stopped" << std::endl;
} }
bool GStreamerPipeline::pushBuffer(uint8_t* data, size_t size, int width, int height, const std::string& format) { bool GStreamerPipeline::pushBuffer(const uint8_t* data, const size_t size, const int width, const int height, const std::string& format) {
if (!running_ || !appsrc_) { if (!running_ || !appsrc_) {
return false; return false;
} }
@@ -119,16 +120,13 @@ bool GStreamerPipeline::pushBuffer(uint8_t* data, size_t size, int width, int he
// Set caps based on format // Set caps based on format
std::string capsStr; std::string capsStr;
if (format == "YUY2" || format == "UYVY") { if (format == "YUY2" || format == "UYVY" || format == "BGR" || format == "RGB") {
capsStr = "video/x-raw,format=" + format + ",width=" + std::to_string(width) + capsStr = "video/x-raw,format=" + format + ",width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1"; ",height=" + std::to_string(height) + ",framerate=30/1";
} else if (format == "MJPG") { } else if (format == "MJPG") {
capsStr = "image/jpeg,width=" + std::to_string(width) + capsStr = "image/jpeg,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1"; ",height=" + std::to_string(height) + ",framerate=30/1";
} else if (format == "BGR" || format == "RGB") { } else {
capsStr = "video/x-raw,format=" + format + ",width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1";
} else {
capsStr = "video/x-raw,width=" + std::to_string(width) + capsStr = "video/x-raw,width=" + std::to_string(width) +
",height=" + std::to_string(height) + ",framerate=30/1"; ",height=" + std::to_string(height) + ",framerate=30/1";
} }

View File

@@ -7,12 +7,12 @@
class GStreamerPipeline { class GStreamerPipeline {
public: public:
explicit GStreamerPipeline(const std::string& pipelineDescription); explicit GStreamerPipeline(std::string pipelineDescription);
~GStreamerPipeline(); ~GStreamerPipeline();
bool start(); bool start();
void stop(); void stop();
bool pushBuffer(uint8_t* data, size_t size, int width, int height, const std::string& format); bool pushBuffer(const uint8_t* data, size_t size, int width, int height, const std::string& format);
bool isRunning() const { return running_; } bool isRunning() const { return running_; }
void setPipelineDescription(const std::string& description); void setPipelineDescription(const std::string& description);

View File

@@ -17,7 +17,7 @@ bool SocketServer::start(CommandCallback callback) {
return false; return false;
} }
commandCallback_ = callback; commandCallback_ = std::move(callback);
// Remove existing socket file if it exists // Remove existing socket file if it exists
unlink(socketPath_.c_str()); unlink(socketPath_.c_str());
@@ -30,12 +30,11 @@ bool SocketServer::start(CommandCallback callback) {
} }
// Bind socket // Bind socket
struct sockaddr_un addr; struct sockaddr_un addr = {};
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socketPath_.c_str(), sizeof(addr.sun_path) - 1); strncpy(addr.sun_path, socketPath_.c_str(), sizeof(addr.sun_path) - 1);
if (bind(serverFd_, (struct sockaddr*)&addr, sizeof(addr)) < 0) { if (bind(serverFd_, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)) < 0) {
std::cerr << "Failed to bind socket: " << strerror(errno) << std::endl; std::cerr << "Failed to bind socket: " << strerror(errno) << std::endl;
close(serverFd_); close(serverFd_);
return false; return false;
@@ -94,16 +93,16 @@ void SocketServer::serverLoop() {
} }
} }
void SocketServer::handleClient(int clientFd) { void SocketServer::handleClient(const int clientFd) {
char buffer[4096]; char buffer[4096];
ssize_t bytesRead = recv(clientFd, buffer, sizeof(buffer) - 1, 0); ssize_t bytesRead = recv(clientFd, buffer, sizeof(buffer) - 1, 0);
if (bytesRead > 0) { if (bytesRead > 0) {
buffer[bytesRead] = '\0'; buffer[bytesRead] = '\0';
std::string command(buffer); const std::string command(buffer);
// Call the command callback // Call the command callback
std::string response = commandCallback_(command); const std::string response = commandCallback_(command);
// Send response back to client // Send response back to client
send(clientFd, response.c_str(), response.length(), 0); send(clientFd, response.c_str(), response.length(), 0);

View File

@@ -1,9 +1,10 @@
#include "StreamingEngine.h" #include "StreamingEngine.h"
#include <iostream> #include <iostream>
#include <chrono> #include <chrono>
#include <utility>
StreamingEngine::StreamingEngine(std::shared_ptr<VxCamera> camera) StreamingEngine::StreamingEngine(std::shared_ptr<VxCamera> camera)
: camera_(camera), running_(false), bufferSize_(0) { : camera_(std::move(camera)), running_(false), bufferSize_(0) {
gstPipeline_ = std::make_unique<GStreamerPipeline>(""); gstPipeline_ = std::make_unique<GStreamerPipeline>("");
} }

View File

@@ -14,10 +14,10 @@ public:
bool start(const std::string& gstPipeline); bool start(const std::string& gstPipeline);
void stop(); void stop();
bool isRunning() const { return running_; } [[nodiscard]] bool isRunning() const { return running_; }
void setFormat(const VxFormat& format); void setFormat(const VxFormat& format);
VxFormat getCurrentFormat() const { return currentFormat_; } [[nodiscard]] VxFormat getCurrentFormat() const { return currentFormat_; }
void setPipelineDescription(const std::string& pipeline); void setPipelineDescription(const std::string& pipeline);

View File

@@ -73,13 +73,14 @@ int main() {
std::cout << "\n========================================" << std::endl; std::cout << "\n========================================" << std::endl;
std::cout << "VizionStreamer Ready" << std::endl; std::cout << "VizionStreamer Ready" << std::endl;
std::cout << "Author: Maik Jurischka <m.jurischka@scidre.de" << std::endl;
std::cout << "========================================" << std::endl; std::cout << "========================================" << std::endl;
std::cout << "Control socket: " << socketPath << std::endl; std::cout << "Control socket: " << socketPath << std::endl;
std::cout << "Default pipeline: videoconvert ! autovideosink" << std::endl; std::cout << "Default pipeline: videoconvert ! autovideosink" << std::endl;
std::cout << "\nQuick start:" << std::endl; std::cout << "\nQuick start:" << std::endl;
std::cout << " echo '{\"command\":\"start_stream\"}' | socat - UNIX-CONNECT:" << socketPath << std::endl; std::cout << R"( echo '{"command":"start_stream"}' | socat - UNIX-CONNECT:)" << socketPath << std::endl;
std::cout << "\nTo change pipeline before starting:" << std::endl; std::cout << "\nTo change pipeline before starting:" << std::endl;
std::cout << " echo '{\"command\":\"set_pipeline\",\"params\":{\"pipeline\":\"YOUR_PIPELINE\"}}' | socat - UNIX-CONNECT:" << socketPath << std::endl; std::cout << R"( echo '{"command":"set_pipeline","params":{"pipeline":"YOUR_PIPELINE"}}' | socat - UNIX-CONNECT:)" << socketPath << std::endl;
std::cout << "\nPress Ctrl+C to exit.\n" << std::endl; std::cout << "\nPress Ctrl+C to exit.\n" << std::endl;
// Main loop - keep running until signaled to stop // Main loop - keep running until signaled to stop

24
scripts/mjpeg_http_proxy.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# HTTP proxy wrapper for MJPEG stream from tcpserversink
# This adds proper HTTP headers for browser compatibility
TCP_PORT="${1:-8081}"
HTTP_PORT="${2:-8080}"
echo "Starting MJPEG HTTP proxy..."
echo "Listening for TCP stream on port $TCP_PORT"
echo "Serving HTTP on port $HTTP_PORT"
echo ""
echo "Configure vizionStreamer to use tcpserversink on port $TCP_PORT"
echo "Then access the stream at: http://localhost:$HTTP_PORT"
echo ""
# Start socat to wrap TCP stream with HTTP headers
socat -v TCP-LISTEN:$HTTP_PORT,reuseaddr,fork \
SYSTEM:"echo 'HTTP/1.1 200 OK'; \
echo 'Content-Type: multipart/x-mixed-replace; boundary=--videoboundary'; \
echo 'Cache-Control: no-cache'; \
echo 'Pragma: no-cache'; \
echo 'Connection: close'; \
echo ''; \
socat - TCP:localhost:$TCP_PORT"

79
scripts/mjpeg_http_server.py Executable file
View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Simple HTTP server that proxies MJPEG stream from tcpserversink
Adds proper HTTP headers for browser compatibility
"""
import socket
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
class MJPEGProxyHandler(BaseHTTPRequestHandler):
tcp_host = 'localhost'
tcp_port = 9000
def do_GET(self):
"""Handle GET requests and proxy the MJPEG stream"""
try:
# Connect to the GStreamer tcpserversink
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.tcp_host, self.tcp_port))
# Send HTTP headers
self.send_response(200)
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=--videoboundary')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Pragma', 'no-cache')
self.send_header('Connection', 'close')
self.end_headers()
# Stream data from TCP socket to HTTP client
while True:
data = sock.recv(4096)
if not data:
break
self.wfile.write(data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
finally:
try:
sock.close()
except:
pass
def log_message(self, format, *args):
"""Custom logging"""
print(f"[{self.client_address[0]}] {format % args}")
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in separate threads"""
pass
if __name__ == '__main__':
tcp_port = int(sys.argv[1]) if len(sys.argv) > 1 else 8081
http_port = int(sys.argv[2]) if len(sys.argv) > 2 else 8080
MJPEGProxyHandler.tcp_port = tcp_port
server = ThreadedHTTPServer(('0.0.0.0', http_port), MJPEGProxyHandler)
print(f"MJPEG HTTP Proxy Server")
print(f"========================")
print(f"Proxying TCP stream from localhost:{tcp_port}")
print(f"HTTP server listening on port {http_port}")
print(f"")
print(f"Configure vizionStreamer with:")
print(f' ./scripts/set_pipeline_mjpeg.sh {tcp_port}')
print(f"")
print(f"Access stream at: http://localhost:{http_port}")
print(f"")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.shutdown()

View File

@@ -4,8 +4,18 @@
SOCKET="/tmp/vizion_control.sock" SOCKET="/tmp/vizion_control.sock"
PORT="${1:-8080}" PORT="${1:-8080}"
# Check if gst-plugins-good with souphttpsink is available
if gst-inspect-1.0 souphttpsink &>/dev/null; then
echo "Using souphttpsink for HTTP server..."
PIPELINE="videoconvert ! jpegenc quality=85 ! multipartmux boundary=\"--videoboundary\" ! souphttpsink port=$PORT"
else
echo "WARNING: souphttpsink not found. Using tcpserversink (may not work in browsers)."
echo "Install gst-plugins-good: sudo apt install gstreamer1.0-plugins-good"
PIPELINE="videoconvert ! jpegenc quality=85 ! multipartmux ! tcpserversink host=0.0.0.0 port=$PORT"
fi
echo "Setting MJPEG HTTP streaming pipeline on port $PORT..." echo "Setting MJPEG HTTP streaming pipeline on port $PORT..."
echo "{\"command\":\"set_pipeline\",\"params\":{\"pipeline\":\"videoconvert ! jpegenc ! multipartmux ! tcpserversink host=0.0.0.0 port=$PORT\"}}" | socat - UNIX-CONNECT:$SOCKET echo "{\"command\":\"set_pipeline\",\"params\":{\"pipeline\":\"$PIPELINE\"}}" | socat - UNIX-CONNECT:$SOCKET
echo "" echo ""
echo "Pipeline set. Start streaming with start_stream.sh" echo "Pipeline set. Start streaming with start_stream.sh"

View File

@@ -0,0 +1,32 @@
#!/bin/bash
# Optimized pipeline for cameras that already output MJPEG format
# Passes through MJPEG data without re-encoding
SOCKET="/tmp/vizion_control.sock"
PORT="${1:-8080}"
# Check if gst-plugins-good with souphttpsink is available
if gst-inspect-1.0 souphttpsink &>/dev/null; then
echo "Using souphttpsink for HTTP server (no re-encoding)..."
PIPELINE="multipartmux boundary=\"--videoboundary\" ! souphttpsink port=$PORT"
else
echo "WARNING: souphttpsink not found. Using tcpserversink (requires HTTP proxy)."
echo "Install gst-plugins-good: sudo apt install gstreamer1.0-plugins-good"
echo "Or use: ./scripts/mjpeg_http_server.py $PORT 8080"
PIPELINE="multipartmux ! tcpserversink host=0.0.0.0 port=$PORT"
fi
echo "Setting optimized MJPEG passthrough pipeline on port $PORT..."
echo "NOTE: This pipeline is optimized for cameras with native MJPEG output."
echo ""
echo "{\"command\":\"set_pipeline\",\"params\":{\"pipeline\":\"$PIPELINE\"}}" | socat - UNIX-CONNECT:$SOCKET
echo ""
echo "Pipeline set. Start streaming with start_stream.sh"
if gst-inspect-1.0 souphttpsink &>/dev/null; then
echo "View stream in browser: http://localhost:$PORT"
else
echo "Start HTTP proxy first: ./scripts/mjpeg_http_server.py $PORT 8080"
echo "Then view stream: http://localhost:8080"
fi