test HTTP stream server - no "disconnect" signal from SoupSocket



Hi,

I am writing an HTTP server that would serve a stream produced by a
GStreamer pipeline. I chose libsoup for this task as it's already used
as a HTTP backend in some Gst elements and it integrates nicely with
GMainLoop. To get buffers from the pipeline I use appsink element.

I have some logic that I have to comply with before I can actually play
the pipeline and some requirements like the stream should be served
continuously with a *single* GET request (after some initial
"negotiation and probing"). So no chunked delivery is acceptable at the
HTTP level (unless the client explicitly makes separate Ranges requests).

In order to acheive this I had to reach out for a pretty low level
SoupSocket API where I can write directly into the socket as Gst buffers
arrive. None of the higher level libsoup methods suited me as they tend
to break some of my requirements underneath, e. g. they break the stream
down into HTTP chunks if I use
soup_server_pause_message()/soup_server_unpause_message() no matter what
message encoding I choose.

The problem I have is that I don't receive SoupSocket "disconnected"
signal once the Gst pipeline is started. I think that's because I write
into the socket directly. I need the server to stop the pipeline once
the client closed the connection. I am checking the value returned from
soup_socket_write() but I'm not sure if this is the best approach.

Any ideas?

static void
server_callback (SoupServer        *server,
                 SoupMessage       *msg,
                 const char        *path,
                 GHashTable        *query,
                 SoupClientContext *context,
                 gpointer           pipeline)
{
  // ...


  socket = soup_client_context_get_socket (context);

  if (!pipeline_ready (GST_ELEMENT (pipeline), 0, socket)) {
    soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
    return;
  }

  g_object_ref (socket);
  g_signal_connect (msg,
                    "wrote-headers",
                    G_CALLBACK (wrote_headers),
                    pipeline);

  // set all headers
  // ...
}

static void
wrote_headers (SoupMessage *msg,
               gpointer     pipeline)
{
  g_debug ("Wrote headers...");

  // at least let's get a warning
  g_return_if_fail (
    GST_STATE_CHANGE_FAILURE !=
      gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING));
}

static GstFlowReturn
pipeline_new_sample (GstAppSink *appsink, gpointer data)
{
  // ...
  if (!socket_write (socket, map.data, map.size, &error)) {
    // ...
    ret = GST_FLOW_EOS; // this stops the pipeline through the main loop
  }

  // ...
  return ret;
}

static gboolean
socket_write (SoupSocket *socket,
              gpointer    buffer,
              gsize       len,
              GError    **error)
{
  gsize wrote_buffer, wrote_socket;
  SoupSocketIOStatus status;

  *error = NULL;
  for (wrote_buffer = 0; wrote_buffer < len; wrote_buffer += wrote_socket) {
    status = soup_socket_write (socket,
                                buffer + wrote_buffer,
                                len - wrote_buffer,
                                &wrote_socket,
                                NULL,
                                error);

    if (SOUP_SOCKET_OK != status) {
      return FALSE;
    }
  }

  return TRUE;
}


-------

Cheers,
Kris


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]