Membrane, Phoenix, Nerves

Membrane, Phoenix, Nerves

A recent interest of mine has been multimedia & streaming. I've always believed Elixir/Erlang would be an amazing fit for this niche if it were to be used there. It took a few years, but finally Software Manson released their new open source framework called Membrane at Elixir Conf 2019. I didn't have an immediate use for the technology at the time, but I've since had the pleasure to work with it both professionally and for a side project. For my side project I wanted to see the difficulty required to build a clone of an existing project called Octo Print which is a snappy web interface for your 3D printer.

Specifically, I wanted to clone the webcam streamer plugin, with the added twist of using my own server instead of leveraging an existing streaming service (in this case twitch.tv).


Membrane ❤️ Phoenix

While a useful tool by itself, to really get full use out of Membrane, you'll probably want to connect it to a web stack. This will of course depend on the project being worked, but for the purpose of simplicity, let's assume the "streaming" portion of the application will be in the same codebase as our web stack.

I start out with the ever familiar mix phx.new

mix phx.new noven

Our app is going to live out on the internet somewhere, and our Nerves device will be able to stream to it. I'm going to skip over the (relatively) boring portions of building device authn/z, but the short version is to leverage phx_gen_auth to create aUser schema:

defmodule Noven.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  @derive {Inspect, except: [:password]}
  schema "users" do
    field :email, :string
    field :password, :string, virtual: true
    field :hashed_password, :string
    field :confirmed_at, :naive_datetime
    # yet to be created....
    has_many :devices, Noven.Devices.Device

    timestamps()
  end
end

and a Device schema:

defmodule Noven.Devices.Device do
  use Ecto.Schema
  import Ecto.Changeset

  schema "devices" do
    field :last_connected, :utc_datetime
    field :name, :string
    field :serial, :string
    belongs_to :user, Noven.Accounts.User
    # tokens are how a device is authorized.
    has_one :device_token, Noven.Devices.DeviceToken
    timestamps()
  end

Again, for simplicity, I'm omitting the implementation for Noven.Devices.DeviceToken as I don't find it particularly useful in the context of this module. See the source code for full implementations.

Now that we have a Device and a User, we'll need to start building out the interesting part - Streaming. The first goal will be what is referred to as "signaling." Basically this is the part that controls the stream details - if it is or isn't streaming currently, where the device should send the stream, quality, etc. There are existing protocols for this such as RTSP/RTCP, but I opted to use Phoenix Channels for a few reasons.

  1. I'm already familiar with server and client implementations and APIs.
  2. It's simple.
  3. No extra web server ports on top of the normal web stack.
  4. Doing that allows super easy plugs into Phoenix Presence

First I needed a socket for devices to connect to:

defmodule NovenWeb.DeviceSocket do
  use Phoenix.Socket
  require Logger

  ## Channels
  channel "device:*", NovenWeb.DeviceChannel

  @impl true
  def connect(%{"token" => token}, socket, _connect_info) do
    with {:ok, token} <- Base.url_decode64(token, padding: false),
         token <- :crypto.hash(:sha256, token),
         %Noven.Devices.Device{} = device <- Noven.Devices.get_device_by_token(token) do
      {:ok,
       socket
       |> assign(:device, device)}
    else
      error ->
        Logger.error("Could not authenticate device: #{inspect(error)}")
        :error
    end
  end

  # Socket id's are topics that allow you to identify all sockets for a given user:
  #
  #     def id(socket), do: "user_socket:#{socket.assigns.user_id}"
  #
  # Would allow you to broadcast a "disconnect" event and terminate
  # all active sockets and channels for a given user:
  #
  #     NovenWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
  #
  # Returning `nil` makes this socket anonymous.
  @impl true
  def id(socket), do: "device_socket:#{socket.assigns.device.id}"
end

The connect/3 function is responsible for auth here. Once authorized, an ID is assigned so we can wire up Presence later. Once connected, the device will get sent to a socket:

defmodule NovenWeb.DeviceChannel do
  use NovenWeb, :channel
  alias Phoenix.Socket.Broadcast

  def join(_topic, _params, socket) do
    #TODO IMPLEMENT ME!!!
  end

  def terminate(_, _), do: :ok
end

Now with the basic scaffolding out of the way, we can finally get to the fun stuff. We'll need to add the Membrane mix dependencies next:

# core services of membrane
{:membrane_core, "~> 0.5.2"},
# Our stream will be received via a UDP stream
{:membrane_element_udp, "~> 0.3.2"},
# The stream will be encoded via h264.
{:membrane_element_ffmpeg_h264, "~> 0.3.0"},
# the live stream will be transcoded into mp4
{:membrane_mp4_plugin, "~> 0.3.0"},
# and streamed to the end user via http adaptive streaming
{:membrane_http_adaptive_stream_plugin, "~> 0.1.0"},
# handles depayloading RTP data
{:membrane_rtp_plugin, "~> 0.4.0-alpha"},
# handles depayloading h264 out of RTP
{:membrane_rtp_h264_plugin, "~> 0.3.0-alpha"},

Lets hop right into building the Pipeline and I'll explain where those will all be used:

defmodule NovenMedia.Pipeline do
  use Membrane.Pipeline
  alias Membrane.Time

  require Logger

  @default_presence %{
    pipeline: "stop",
    ssrc: nil
  }

  @doc false
  def child_spec([%Noven.Devices.Device{} = device, port]) do
    via = {:via, Registry, {NovenMedia.NameRegistry, {"pipeline", device.id}}}
    start_spec = {__MODULE__, :start_link, [[device, port], [name: via]]}

    %{
      id: {:pipeline, device.id},
      start: start_spec
    }
  end

  @impl true
  def handle_init([device, port]) do
    {:ok, _ref} = Noven.DevicePresence.track(self(), "devices", "#{device.id}", @default_presence)
    table = :ets.new(:"stream-#{device.id}", [:public, :named_table])

    children = %{
      app_source: %Membrane.Element.UDP.Source{
        local_port_no: port,
        recv_buffer_size: 500_000
      },
      rtp: %Membrane.RTP.Session.ReceiveBin{
        fmt_mapping: %{96 => :H264},
        custom_depayloaders: %{
          :H264 => Membrane.RTP.H264.Depayloader
        }
      }
    }

    links = [
      link(:app_source) |> via_in(:input, buffer: [fail_size: 300]) |> to(:rtp)
    ]

    spec = %ParentSpec{children: children, links: links}
    {{:ok, spec: spec}, %{device: device, table: table}}
  end

  @impl true
  def handle_shutdown(_reason, state) do
    if state.table, do: :ets.delete(state.table)
    :ok
  end

  @impl true
  def handle_notification({:new_rtp_stream, ssrc, :H264}, :rtp, state) do
    {:ok, _ref} =
      Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
        pipeline: "play",
        ssrc: to_string(ssrc)
      })

    video_timestamper = {:video_timestamper, make_ref()}
    video_nal_parser = {:video_nal_parser, make_ref()}
    decoder = {:decoder, make_ref()}
    video_payloader = {:video_payloader, make_ref()}
    video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
    hls_encoder = {:hls_encoder, make_ref()}

    children = %{
      video_timestamper => %Membrane.RTP.Timestamper{
        resolution: Ratio.new(Time.second(), 90_000)
      },
      video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
        framerate: {30, 1},
        alignment: :au,
        attach_nalus?: true
      },
      decoder => Membrane.Element.FFmpeg.H264.Decoder,
      scissors => %Membrane.Scissors{
        intervals:
          Stream.iterate(0, &(&1 + Membrane.Time.milliseconds(10))) |> Stream.map(&{&1, 1}),
        interval_duration_unit: :buffers,
        buffer_duration: fn _buffer, _caps -> 100 end
      },
      thumbnailer => NovenMedia.Thumbnailer,
      tee => Membrane.Element.Tee.Parallel,
      video_payloader => Membrane.MP4.Payloader.H264,
      video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
      hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
        manifest_module: Membrane.HTTPAdaptiveStream.HLS,
        target_window_duration: 10 |> Membrane.Time.seconds(),
        storage: %NovenMedia.ETSStorage{table: state.table}
      }
    }

    links = [
      link(:rtp)
      |> via_out(Pad.ref(:output, ssrc))
      |> to(video_timestamper)
      |> to(video_nal_parser)
      |> to(video_payloader)
      |> to(video_cmaf_muxer)
      |> via_in(:input)
      |> to(hls_encoder)
    ]

    spec = %ParentSpec{children: children, links: links}
    {{:ok, spec: spec}, state}
  end

  def handle_notification(_notification, _element, state) do
    {:ok, state}
  end
end

That's a lot of code, but let's step thru it:

use Membrane.Pipeline

Probably the most important part. Defines all the code to implement a Pipeline.

def child_spec([%Noven.Devices.Device{} = device, port]) do
  via = {:via, Registry, {NovenMedia.NameRegistry, {"pipeline", device.id}}}
  start_spec = {__MODULE__, :start_link, [[device, port], [name: via]]}

  %{
    id: {:pipeline, device.id},
    start: start_spec
  }
end

Simple child_spec implementation that tells OTP how to supervise this pipeline. Every Pipeline is just a GenServer under the hood, so calling start_link will wie up everything for us.

Processes are named via a Registry.

def handle_init([device, port]) do
  {:ok, _ref} = Noven.DevicePresence.track(self(), "devices", "#{device.id}", @default_presence)
  table = :ets.new(:"stream-#{device.id}", [:public, :named_table])

  children = %{
    app_source: %Membrane.Element.UDP.Source{
      local_port_no: port,
      recv_buffer_size: 500_000
    },
    rtp: %Membrane.RTP.Session.ReceiveBin{
      fmt_mapping: %{96 => :H264},
      custom_depayloaders: %{
        :H264 => Membrane.RTP.H264.Depayloader
      }
    }
  }

  links = [
    link(:app_source) |> via_in(:input, buffer: [fail_size: 300]) |> to(:rtp)
  ]

  spec = %ParentSpec{children: children, links: links}
  {{:ok, spec: spec}, %{device: device, table: table}}
end

Analogous to Genserver.init/1 in the Membrane Framework.

Here we do a few things:

  1. set up a presence to track devices via a frontend, telemetry, etc.
  2. create an ETS table. This will be used as a temporary buffer to store h264 frames
  3. define the children of the parent spec.
  4. define the links between those children
  5. return the initial state

app_source is the UDP port that the application will listen on for RTP data. rtp is a binary protocol that allows for real time sending of data.

def handle_shutdown(_reason, state) do
  if state.table, do: :ets.delete(state.table)
  :ok
end

Cleanup if the pipeline crashes.

def handle_notification({:new_rtp_stream, ssrc, :H264}, :rtp, state) do
  {:ok, _ref} =
    Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
      pipeline: "play",
      ssrc: to_string(ssrc)
    })

  video_timestamper = {:video_timestamper, make_ref()}
  video_nal_parser = {:video_nal_parser, make_ref()}
  decoder = {:decoder, make_ref()}
  thumbnailer = {:thumbnailer, make_ref()}
  tee = {:tee, make_ref()}
  scissors = {:scissors, make_ref()}
  video_payloader = {:video_payloader, make_ref()}
  video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
  hls_encoder = {:hls_encoder, make_ref()}

  children = %{
    # TODO: remove when moved to the RTP bin
    video_timestamper => %Membrane.RTP.Timestamper{
      resolution: Ratio.new(Time.second(), 90_000)
    },
    video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
      framerate: {30, 1},
      alignment: :au,
      attach_nalus?: true
    },
    decoder => Membrane.Element.FFmpeg.H264.Decoder,
    scissors => %Membrane.Scissors{
      intervals:
        Stream.iterate(0, &(&1 + Membrane.Time.milliseconds(10))) |> Stream.map(&{&1, 1}),
      interval_duration_unit: :buffers,
      buffer_duration: fn _buffer, _caps -> 100 end
    },
    thumbnailer => NovenMedia.Thumbnailer,
    tee => Membrane.Element.Tee.Parallel,
    video_payloader => Membrane.MP4.Payloader.H264,
    video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
    hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
      manifest_module: Membrane.HTTPAdaptiveStream.HLS,
      target_window_duration: 10 |> Membrane.Time.seconds(),
      storage: %NovenMedia.ETSStorage{table: state.table}
    }
  }

  links = [
    link(:rtp)
    |> via_out(Pad.ref(:output, ssrc))
    |> to(video_timestamper)
    |> to(video_nal_parser)
    |> to(video_payloader)
    |> to(video_cmaf_muxer)
    |> via_in(:input)
    |> to(hls_encoder)
  ]

  spec = %ParentSpec{children: children, links: links}
  {{:ok, spec: spec}, state}
end

This is the bulk of the pipeline. It looks like a lot, but in reality it's not so complicated. Just a lot of wiring. Lets' walk through it.

{:ok, _ref} =
  Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
    pipeline: "play",
    ssrc: to_string(ssrc)
  })

Update the presence. Nothing that interesting here, except to note that ssrc is an RTP thing. It's basically an identifier for a stream.

video_timestamper = {:video_timestamper, make_ref()}
video_nal_parser = {:video_nal_parser, make_ref()}
decoder = {:decoder, make_ref()}
video_payloader = {:video_payloader, make_ref()}
video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
hls_encoder = {:hls_encoder, make_ref()}

Create references to all the stages in the pipeline. These names can be anything. We will see how they get used below.

children = %{
  video_timestamper => %Membrane.RTP.Timestamper{
    resolution: Ratio.new(Time.second(), 90_000)
  },
  video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
    framerate: {30, 1},
    alignment: :au,
    attach_nalus?: true
  },
  decoder => Membrane.Element.FFmpeg.H264.Decoder,
  video_payloader => Membrane.MP4.Payloader.H264,
  video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
  hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
    manifest_module: Membrane.HTTPAdaptiveStream.HLS,
    target_window_duration: 10 |> Membrane.Time.seconds(),
    storage: %NovenMedia.ETSStorage{table: state.table}
  }
}

Similar to handle_init/1, handle_notification/3 can spawn more children using the same API. This portion lists implementations for each of the references we created earlier. There is a lot here but the wiring portion will clear it up.

links = [
  link(:rtp)
  |> via_out(Pad.ref(:output, ssrc))
  |> to(video_timestamper)
  |> to(video_nal_parser)
  |> to(video_payloader)
  |> to(video_cmaf_muxer)
  |> via_in(:input)
  |> to(hls_encoder)
]

This essentially defines the route the data will flow through the pipeline after the RTP connection is created.

  • new Stream (connection opened)
  • stream is timestamped
  • stream is parsed into NAL units
  • stream is decoded into h264 payloads.
  • stream is MUX-ed into MP4 format
  • stream is encoded as HLS
  • via the ETS storage module. I didn't include the implementation of this, but
    it is essentially a read and a write function.

Finally we can now start processing the "first mile" of our stream. We just need to go back into the device_channel and do the final control functions:

@port 5000
def join(_topic, _params, socket) do
  host = socket.endpoint.host()
  socket.endpoint.subscribe("devices:#{socket.assigns.device.id}")

  case NovenMedia.Supervisor.start_pipeline(socket.assigns.device, @port) do
    {:ok, pid} ->
      :ok = NovenMedia.Pipeline.play(pid)
      {:ok, %{host: host, port: @port}, assign(socket, :pipeline_pid, pid)}

    {:error, {:already_started, pid}} ->
      :ok = NovenMedia.Pipeline.play(pid)
      {:ok, %{host: host, port: @port}, assign(socket, :pipeline_pid, pid)}
  end
end

def terminate(_, %{assigns: %{pipeline_pid: pid}}) do
  NovenMedia.Pipeline.stop_and_terminate(pid, [])
end

def handle_info(%Broadcast{event: "play", payload: payload}, socket) do
  push(socket, "play", payload)
  {:noreply, socket}
end

def handle_info(%Broadcast{event: "stop", payload: payload}, socket) do
  push(socket, "stop", payload)
  {:noreply, socket}
end

This essentially starts our pipeline (using that child_spec function - remember it?) when the device connects to the channel. Returning %{host: host, port: @port} will send a message to the Nerves device on where to send it's data to. Control is handled by a Broadcast events that can come from anywhere. In our case, it's via the Presence and a Live View Connection.

Of course, I'm omitting the UI code here, but the bulk of the work is handled by a LiveView that lists devices connected as indexed by the Presence we created earlier.


Membrane ❤️ Nerves

For the device connection, I of course used Nerves to deploy Elixir code to a Raspberry Pi 3. I chose RPI3 because it has a built in h264 encoder built into the CPU that is fairly easy to access via a program called gstreamer.

mix nerves.new --target=rpi3 noven_link

The only dependencies we need here are:

{:phoenix_client, "~> 0.11"},
{:jason, "~> 1.0"},

for the Phoenix Channels connection.

Next, in application.ex I added an implementation for connecting to the Socket with a token that is stored on the SD card.

  def children(_target) do
    # contains the token as a query param
    url = Nerves.Runtime.KV.get("noven_url")
    [
      {PhoenixClient.Socket, {[url: url], [name: Socket]}},
      {DeviceChannel, [socket: Socket]},
    ]
end

The next thing that is needed to be implemented is of course the device_channel:

defmodule NovenLink.DeviceChannel do
  alias PhoenixClient.Channel
  require Logger

  defmodule State do
    defstruct params: %{},
              socket: NovenLink.Socket,
              topic: nil,
              connected?: false,
              channel: nil,
              rejoin_after: 5000,
              livestream: nil,
              host: nil,
              port: nil
  end

  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def terminate(reason, state) do
    Logger.info("Stopping livestream")

    if state.livestream && Process.alive?(state.livestream),
      do: Process.exit(state.livestream, reason)
  end

  def init(opts) do
    send(self(), :join)
    rejoin_after = Application.get_env(:noven_link, :rejoin_after, 5_000)
    uuid = System.unique_integer([:positive])

    {:ok,
     %State{
       params: opts[:params],
       socket: opts[:socket],
       topic: "device:#{uuid}",
       connected?: false,
       rejoin_after: rejoin_after,
       host: nil,
       port: nil
     }}
  end

  def handle_info(:join, %{socket: socket, topic: topic, params: params} = state) do
    case Channel.join(socket, topic, params) do
      {:ok, %{"host" => host, "port" => port}, channel} ->
        Logger.info("Joined channel")
        Process.send_after(self(), :checkup, 5000)
        {:noreply, %{state | channel: channel, connected?: true, host: host, port: port}}

      _error ->
        Process.send_after(self(), :join, state.rejoin_after)
        {:noreply, %{state | connected?: false}}
    end
  end

  def handle_info(:checkup, state) do
    if PhoenixClient.Socket.connected?(state.socket) do
      Process.send_after(self(), :checkup, 5000)
      {:noreply, state}
    else
      {:stop, :socket_disconnect, state}
    end
  end

  def handle_info(
        %PhoenixClient.Message{
          event: "play",
          payload: _
        },
        state
      ) do
    Logger.info "Starting Livestream"
    {:ok, livestream} = NovenLink.LiveStream.start_link(state.host, state.port)
    {:noreply, %{state | livestream: livestream}}
  end

  def handle_info(
        %PhoenixClient.Message{
          event: "stop",
          payload: _
        },
        state
      ) do
    Logger.info "Stopping livestream"
    state.livestream && GenServer.stop(state.livestream, :normal)
    {:noreply, %{state | livestream: nil}}
  end
end

This is a simple GenServer that connects to the DeviceChannel on the Web application. The important part for communication will be:

Channel.join(socket, topic, params)

Basically, that is what will start the Pipeline on the Web app side. And the important part for the actual stream is:

def handle_info(
      %PhoenixClient.Message{
        event: "play",
        payload: _
      },
      state
    ) do
  Logger.info "Starting Livestream"
  {:ok, livestream} = NovenLink.LiveStream.start_link(state.host, state.port)
  {:noreply, %{state | livestream: livestream}}
end

This starts a linked GenServer: NovenLink.LiveStream. We'll look at that implementation next:

defmodule NovenLink.LiveStream do
  use GenServer
  require Logger

  def start_link(host, port) do
    GenServer.start_link(__MODULE__, [host, port], name: __MODULE__)
  end

  @impl GenServer
  def terminate(_reason, state) do
    IO.puts "Livestream terminate: #{inspect(state.gst)}"
    if state.gst, do: Process.exit(state.gst, :shutdown)
  end

  @impl GenServer
  def init([host, port]) do
    send(self(), :start_stream)
    {:ok, %{host: to_charlist(host), port: port, gst: nil}}
  end

  @impl GenServer
  def handle_info(:start_stream, state) do
    Logger.info "Connecting to socket: #{state.host} #{state.port}"
    gst = start_gst(state.host, state.port)
    Process.link(gst)
    {:noreply, %{state | gst: gst}}
  end

  defp start_gst(host, port) do
    gst = System.find_executable("gst-launch-1.0")

    args = [
      "v4l2src",
      "!",
      "video/x-h264,",
      "stream-format=byte-stream,",
      "alignment=au,",
      "width=640,",
      "height=480,",
      "pixel-aspect-ratio=1/1,",
      "framerate=30/1",
      "!",
      "rtph264pay",
      "pt=96",
      "!",
      "udpsink",
      "host=#{host}",
      "port=#{port}"
    ]
    spawn(fn -> MuonTrap.cmd(gst, args, into: IO.stream(:stdio, :line)) end)
  end
end

This is essentially a light wrapper using Muontrap which is part of the standard Nerves tooling. Basically it contains an external process. The external process that we are wrapping here is called GStreamer which is a battle tested tool for media streaming and receiving media streams. We are using the udpsink function to stream h264 data from v4l2src. This is basically the magic that formats our UDP datagrams in a way that our Phoenix/Membrane stack will understand.


Conclusion

And that's basically it. After a bit of CSS and some Docker magic to get it out there on the internet, everything worked!

screenshot

I obviously left a lot out here, so be sure to check out the source code.


Like what you read? Subscribe to our mailing list below to stay up to date with Binary Noggin!

Tags :