如何从 GStreamer 管道读取数据中的特定字段?

How do I read a specific field in data from a GStreamer Pipeline?

目前,我正在 Linux 环境中使用 udp 将视频从摄像机流式传输到 windows 笔记本电脑。

我正在尝试从 rtph264pay 元素的输出中读取序列号。

我的管道正常工作,并且添加了一个焊盘探针。我不知道如何从数据中读取特定字段,特别是缓冲区中的序列号字段。我该怎么做?

我试图查看缓冲区的元数据中是否有内容。我找不到与缓冲区关联的任何元数据(那是因为 while 循环中没有执行任何内容)。

#include <glib-object.h>

#include <stdio.h>
#include <gst/gst.h>

#include <iostream> // test include

#define DUMMY_TEST_CASE

static const gchar* SRC_TYPE = "v4l2src";
static const gchar* SRC_NAME = "video_for_linux_2_source";
static const gchar* H264_ENC_TYPE = "omxh264enc";
static const gchar* H264_ENC_NAME = "H264 Encoder";
static const gchar* RTP_TYPE = "rtph264pay";
static const gchar* RTP_NAME = "RTP payloader for ts";
static const gchar* UDP_SINK_TYPE = "udpsink";
static const gchar* UDP_SINK_NAME = "UDP Sink sender";

static GMainLoop* gLoop;

class TmpEnterExit
{
   public:

      TmpEnterExit(const char * const apnText) : mpnText(apnText)
      {
         std::cerr << mpnText << " entering.." << std::endl;
      }

      ~TmpEnterExit()
      {
         std::cerr << mpnText << " exiting.." << std::endl;
      }

   private:

      const char * const mpnText;
};

static GstPadProbeReturn cbHaveData(
   GstPad * ,//apsPad,
   GstPadProbeInfo * apsInfo,
   gpointer //apsUsrData
   )
{
   //static bool pbOnce = false;
   TmpEnterExit lcTmpEe("cb");

   //if(pbOnce)
   //{
   //   return GST_PAD_PROBE_OK;
   //}
   //
   //else
   //{
   //   pbOnce = true;
   //}

   GstBuffer * lpsBuffer = GST_PAD_PROBE_INFO_BUFFER(apsInfo);
   //gpointer lpsBuffer = GST_PAD_PROBE_INFO_DATA(apsInfo);
   gpointer lpvState = NULL;
   GstMeta * lpsMeta;

   if(!lpsBuffer)
   {
      std::cerr << "buffer does not exist" << std::endl;
      std::cerr.flush();
      return GST_PAD_PROBE_OK;
   }

   std::cerr
      << "buffer " << lpsBuffer
      << " about to try to get meta data" << std::endl;
   std::cerr.flush();

   GQuark lnTypeQname;
   GType lnApi;
   bool lbFound = false;

   while
   (
      NULL
   != (
         lpsMeta = gst_buffer_iterate_meta( lpsBuffer, &lpvState )
      )
   )
   {
      lbFound = true;
      const GstMetaInfo * lpsMetaInfo = lpsMeta->info;
      if( !lpsMetaInfo )
      {
         std::cerr << "there is no info in the meta..." << std::endl;
         std::cerr.flush();
         return GST_PAD_PROBE_OK;
      }

      std::cerr
         << "lpsMetaInfo: '" << lpsMetaInfo
         << "' getting api..." << std::endl;
      std::cerr.flush();

      lnApi = lpsMetaInfo->api;

      std::cerr
         << "lnApi: " << lnApi
         << " getting quark..." << std::endl;
      std::cerr.flush();

      lnTypeQname = g_type_qname(lnApi);

      const gchar * lpnQuarkString = g_quark_to_string(lnTypeQname);

      if(!lpnQuarkString)
      {
         std::cerr
            << "quark: " << lnTypeQname << " has no name" << std::endl;
      std::cerr.flush();

         return GST_PAD_PROBE_OK;
      }

      std::cerr
         << "quark: " << lnTypeQname
         << "name: " << lpnQuarkString
         << std::endl;
      std::cerr.flush();
   }

   return GST_PAD_PROBE_OK;
}

struct Pipeline
{
   // GstElementFactory* src_factory;
   GstElement* backbone;
   GstElement* src_element;
   GstElement* vid_enc;
   GstElement* filter_element;
   GstElement* muxer_element;
   GstElement* rtp_element;
   GstElement* sink_element;
};

int main(int argc, char* argv[])
{
   const gchar* nano_str;
   guint major, minor, micro, nano;
   Pipeline pipeline;

   // initialize gstreamer
   //  using NULL elements instead of command line args
   //  in examples this is usually: gst_init(&argc, &argv);
   gst_init(NULL, NULL);
   gLoop = g_main_loop_new(NULL, FALSE);

   // get the version we are using and output it
   gst_version(&major, &minor, &micro, &nano);

   if(nano == 1)
   {
      nano_str = "(CVS)";
   }

   else if(nano == 2)
   {
      nano_str = "(Prerelease)";
   }

   else
   {
      nano_str = "";
   }

   printf("This program is linked against GStreamer %d.%d.%d %s\n",
      major, minor, micro, nano_str);

   // create the pipeline (core bin)
   pipeline.backbone = gst_pipeline_new("mpeg-ts pipeline");

   //
   // Pipeline
   // gst-launch-1.0 -v v4l2src device=/dev/video0 ! video/x-raw,width=640,height=480,framerate=30/1 ! omxh264enc ! rtph264pay config-interval=1 pt=96 ! udpsink host=argv[0] port=12345
   //
   //    NOTE: the other side is expected to be:
   //       udpsrc port=12345 ! application/x-rtp, payload=96"
   //          ! rtph264depay ! decodebin ! videoconvert ! autovideosink
   //

   //
   // create the source of the pipeline
   //

   // for now, use a test video source (videotestsrc)
   // this portion needs to be replaced by appdata source
   pipeline.src_element = gst_element_factory_make(SRC_TYPE, SRC_NAME);

   if(!pipeline.src_element)
   {
      printf("Failed to create element '%s'\n", SRC_NAME);
      return -1;
   }

   else
   {
      printf("Created element '%s'\n", SRC_NAME);
   }

   //
   // Get the v4l2src pad and update it's caps
   //

   GstCaps *lpsCaps = gst_caps_new_simple(
      "video/x-raw",
      "width", G_TYPE_INT, 640,
      "height", G_TYPE_INT, 480,
      "framerate", GST_TYPE_FRACTION, 30, 1,
      NULL );

   GstElement * lpsFilter = gst_element_factory_make("capsfilter","filter");

   if(!lpsFilter)
   {
      std::cout << "error in creating caps filter" << std::endl;
      return -1;
   }

   g_object_set(lpsFilter,"caps", lpsCaps, NULL);
   gst_caps_unref(lpsCaps);

   //
   // set up the video encoding
   //

   pipeline.vid_enc = gst_element_factory_make(
      H264_ENC_TYPE, H264_ENC_NAME);

   if(!pipeline.vid_enc)
   {
      printf("Failed to create element '%s'\n", H264_ENC_NAME);
      return -1;
   }
   else
   {
      printf("Created element '%s'\n", H264_ENC_NAME);
   }


   //
   // set up the RTP
   //

   pipeline.rtp_element = gst_element_factory_make(RTP_TYPE, RTP_NAME);
   if(!pipeline.rtp_element)
   {
      printf("Failed to create element '%s'\n", RTP_NAME);
      return -1;
   }
   else
   {
      printf("Created element '%s'\n", RTP_NAME);
   }

   g_object_set(G_OBJECT(pipeline.rtp_element), "pt", 96, NULL);
   g_object_set(G_OBJECT(pipeline.rtp_element), "config-interval", 1, NULL);

   //
   // Set up the UDP sink aka destination
   //

   pipeline.sink_element =
      gst_element_factory_make(UDP_SINK_TYPE, UDP_SINK_NAME);

   if(!pipeline.src_element)
   {
      printf("Failed to create element '%s'\n", UDP_SINK_NAME);
      return -1;
   }
   else
   {
      printf("Created element '%s'\n", UDP_SINK_NAME);
   }

   if(argc < 2)
   {
      std::cout << "no host given" << std::endl;
      return -1;
   }
   else
   {
      g_object_set(G_OBJECT(pipeline.sink_element), "host", argv[1], NULL);
   }

   g_object_set(G_OBJECT(pipeline.sink_element), "port", 12345, NULL);
   g_object_set(G_OBJECT(pipeline.sink_element), "sync", TRUE, NULL);

   if(!gst_bin_add(GST_BIN(pipeline.backbone), pipeline.src_element) )     {std::cout << "error adding source" << std::endl;}
   if(!gst_bin_add(GST_BIN(pipeline.backbone), lpsFilter) )                {std::cout << "error adding source" << std::endl;}
   if(!gst_bin_add(GST_BIN(pipeline.backbone), pipeline.vid_enc) )         {std::cout << "error adding encoder" << std::endl;}
   if(!gst_bin_add(GST_BIN(pipeline.backbone), pipeline.rtp_element) )     {std::cout << "error adding rtp" << std::endl;}
   if(!gst_bin_add(GST_BIN(pipeline.backbone), pipeline.sink_element) )    {std::cout << "error adding sink" << std::endl;}

   // try linking the elements
   if(!gst_element_link_many(
      pipeline.src_element,
      lpsFilter,
      pipeline.vid_enc,
      pipeline.rtp_element,
      pipeline.sink_element,
      NULL))
   {
      // TODO cleanup
      printf("Pipeline linking failed!");
      return -1;
   }

   else
   {
      printf("Pipeline link successful");
   }

   GstPad * tmpPad =
      gst_element_get_static_pad(pipeline.rtp_element, "src");
   gst_pad_add_probe(
      tmpPad,
      GST_PAD_PROBE_TYPE_BUFFER,
      (GstPadProbeCallback)cbHaveData,
      NULL,
      NULL);
   gst_object_unref(tmpPad);

   // Set the pipeline to "playing" state
   printf("Setting Pipeline to playing state\n");
   gst_element_set_state(pipeline.backbone, GST_STATE_PLAYING);

   // Iterate
   printf("Running...\n");
   g_main_loop_run(gLoop);

   // Out of the main loop, clean up nicely
   // TODO really need to clean this up properly
   // Also need to figure out how/when to terminate the main loop before we clean up
   g_print("Returned, stopping playback\n");
   gst_element_set_state(pipeline.backbone, GST_STATE_NULL);

   g_print("Deleting pipeline\n");
   gst_object_unref(GST_OBJECT(pipeline.backbone));
   //g_source_remove(bus_watch_id);
   g_main_loop_unref(gLoop);

   return 0;
}

我找不到一种优雅的方式来检索数据。我只需要查找缓冲区中数据的格式并像这样提取它:

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///
/// \brief  A callback for the input to the RTP h264 payloader.
///
/// \note   There should be multiple RTP outputs and one h264 input per image.
///
/// \details The expected content format is as follows:
///      from https://tools.ietf.org/html/rfc3984#section-5.1
///       0                   1                   2                   3
///       0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
///      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
///      |V=2|P|X|  CC   |M|     PT      |       sequence number         |
///      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
///      |                           timestamp                           |
///      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
///      |           synchronization source (SSRC) identifier            |
///      +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
///      |            contributing source (CSRC) identifiers             |
///      |                             ....                              |
///      +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
///
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

static GstPadProbeReturn cbHaveData(
   GstPad * ,
   GstPadProbeInfo * apsInfo,
   gpointer apsUsrData
   )
{
   //
   // Convert the info into something useable...
   //

   GstBuffer * lpsBuffer = GST_PAD_PROBE_INFO_BUFFER(apsInfo);

   if(!lpsBuffer)
   {
      return GST_PAD_PROBE_OK;
   }

   GstMapInfo lsMap;
   if(!gst_buffer_map(lpsBuffer, &lsMap, GST_MAP_READ))
   {
      return GST_PAD_PROBE_OK;
   }

   //
   // Get the sequence number from the RTP
   //

   // get bits 16-31 [2] & [3]
   unsigned short lnSeqNum;
   int lnSize = 2;
   int lnRetVal = gst_buffer_extract(lpsBuffer, 2, (gpointer)&lnSeqNum, lnSize);

   if(lnRetVal == lnSize)
   {
      //
      // reverse the bits of the sequence number
      //

      char * lpnReversedSeqNum = (char*)&lnSeqNum;

      unsigned short lnNewSeqNum = 0;
      ((char*)&lnNewSeqNum)[0] = lpnReversedSeqNum[1];
      ((char*)&lnNewSeqNum)[1] = lpnReversedSeqNum[0];

      printf("sequence number: %d\n", lnNewSeqNum);
   }

   gst_buffer_unmap(lpsBuffer, &lsMap);

   return GST_PAD_PROBE_OK;
}

我仍然愿意接受更好的数据提取方法。