A recent interest of mine has been multimedia & streaming. I've always believed Elixir/Erlang would be an amazing fit for this niche if it were to be used there. It took a few years, but finally Software Manson released their new open source framework called Membrane at Elixir Conf 2019. I didn't have an immediate use for the technology at the time, but I've since had the pleasure to work with it both professionally and for a side project. For my side project I wanted to see the difficulty required to build a clone of an existing project called Octo Print which is a snappy web interface for your 3D printer
.
Specifically, I wanted to clone the webcam streamer plugin with the added twist of using my own server instead of leveraging an existing streaming service (in this case twitch.tv).
Membrane ❤️ Phoenix
While a useful tool by itself, to really get full use out of Membrane, you'll probably want to connect it to a web stack. This will of course depend on the project being worked, but for the purpose of simplicity, let's assume the "streaming" portion of the application will be in the same codebase as our web stack.
I start out with the ever familiar mix phx.new
mix phx.new noven
Our app is going to live out on the internet somewhere, and our Nerves device will be able to stream to it. I'm going to skip over the (relatively) boring portions of building device authn/z, but the short version is to leverage phx_gen_auth
to create aUser
schema:
defmodule Noven.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
@derive {Inspect, except: [:password]}
schema "users" do
field :email, :string
field :password, :string, virtual: true
field :hashed_password, :string
field :confirmed_at, :naive_datetime
# yet to be created....
has_many :devices, Noven.Devices.Device
timestamps()
end
end
and a Device
schema:
defmodule Noven.Devices.Device do
use Ecto.Schema
import Ecto.Changeset
schema "devices" do
field :last_connected, :utc_datetime
field :name, :string
field :serial, :string
belongs_to :user, Noven.Accounts.User
# tokens are how a device is authorized.
has_one :device_token, Noven.Devices.DeviceToken
timestamps()
end
Again, for simplicity, I'm omitting the implementation for Noven.Devices.DeviceToken
as I don't find it particularly useful in the context of this module. See the source code for full implementations.
Now that we have a Device and a User, we'll need to start building the interesting part: streaming. The first goal will be what is referred to as "signaling." Basically this is the part that controls the stream details - if it is or isn't streaming currently, where the device should send the stream, quality, etc. There are existing protocols for this such as RTSP/RTCP, but I opted to use Phoenix Channels for a few reasons.
- I'm already familiar with server and client implementations and APIs.
- It's simple.
- No extra web server ports on top of the normal web stack.
- Doing that allows super easy plugs into Phoenix Presence.
First I needed a socket for devices to connect to:
defmodule NovenWeb.DeviceSocket do
use Phoenix.Socket
require Logger
## Channels
channel "device:*", NovenWeb.DeviceChannel
@impl true
def connect(%{"token" => token}, socket, _connect_info) do
with {:ok, token} <- Base.url_decode64(token, padding: false),
token <- :crypto.hash(:sha256, token),
%Noven.Devices.Device{} = device <- Noven.Devices.get_device_by_token(token) do
{:ok,
socket
|> assign(:device, device)}
else
error ->
Logger.error("Could not authenticate device: #{inspect(error)}")
:error
end
end
# Socket id's are topics that allow you to identify all sockets for a given user:
#
# def id(socket), do: "user_socket:#{socket.assigns.user_id}"
#
# Would allow you to broadcast a "disconnect" event and terminate
# all active sockets and channels for a given user:
#
# NovenWeb.Endpoint.broadcast("user_socket:#{user.id}", "disconnect", %{})
#
# Returning `nil` makes this socket anonymous.
@impl true
def id(socket), do: "device_socket:#{socket.assigns.device.id}"
end
The connect/3
function is responsible for authorization here. Once authorized, an ID is assigned so we can wire up Presence later. Once connected, the device will get sent to a socket:
defmodule NovenWeb.DeviceChannel do
use NovenWeb, :channel
alias Phoenix.Socket.Broadcast
def join(_topic, _params, socket) do
#TODO IMPLEMENT ME!!!
end
def terminate(_, _), do: :ok
end
Now with the basic scaffolding out of the way, we can finally get to the fun stuff. We'll need to add the Membrane mix dependencies next:
# core services of membrane
{:membrane_core, "~> 0.5.2"},
# Our stream will be received via a UDP stream
{:membrane_element_udp, "~> 0.3.2"},
# The stream will be encoded via h264.
{:membrane_element_ffmpeg_h264, "~> 0.3.0"},
# the live stream will be transcoded into mp4
{:membrane_mp4_plugin, "~> 0.3.0"},
# and streamed to the end user via http adaptive streaming
{:membrane_http_adaptive_stream_plugin, "~> 0.1.0"},
# handles depayloading RTP data
{:membrane_rtp_plugin, "~> 0.4.0-alpha"},
# handles depayloading h264 out of RTP
{:membrane_rtp_h264_plugin, "~> 0.3.0-alpha"},
Lets hop right into building the Pipeline
and I'll explain where those will all be used:
defmodule NovenMedia.Pipeline do
use Membrane.Pipeline
alias Membrane.Time
require Logger
@default_presence %{
pipeline: "stop",
ssrc: nil
}
@doc false
def child_spec([%Noven.Devices.Device{} = device, port]) do
via = {:via, Registry, {NovenMedia.NameRegistry, {"pipeline", device.id}}}
start_spec = {__MODULE__, :start_link, [[device, port], [name: via]]}
%{
id: {:pipeline, device.id},
start: start_spec
}
end
@impl true
def handle_init([device, port]) do
{:ok, _ref} = Noven.DevicePresence.track(self(), "devices", "#{device.id}", @default_presence)
table = :ets.new(:"stream-#{device.id}", [:public, :named_table])
children = %{
app_source: %Membrane.Element.UDP.Source{
local_port_no: port,
recv_buffer_size: 500_000
},
rtp: %Membrane.RTP.Session.ReceiveBin{
fmt_mapping: %{96 => :H264},
custom_depayloaders: %{
:H264 => Membrane.RTP.H264.Depayloader
}
}
}
links = [
link(:app_source) |> via_in(:input, buffer: [fail_size: 300]) |> to(:rtp)
]
spec = %ParentSpec{children: children, links: links}
{{:ok, spec: spec}, %{device: device, table: table}}
end
@impl true
def handle_shutdown(_reason, state) do
if state.table, do: :ets.delete(state.table)
:ok
end
@impl true
def handle_notification({:new_rtp_stream, ssrc, :H264}, :rtp, state) do
{:ok, _ref} =
Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
pipeline: "play",
ssrc: to_string(ssrc)
})
video_timestamper = {:video_timestamper, make_ref()}
video_nal_parser = {:video_nal_parser, make_ref()}
decoder = {:decoder, make_ref()}
video_payloader = {:video_payloader, make_ref()}
video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
hls_encoder = {:hls_encoder, make_ref()}
children = %{
video_timestamper => %Membrane.RTP.Timestamper{
resolution: Ratio.new(Time.second(), 90_000)
},
video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
framerate: {30, 1},
alignment: :au,
attach_nalus?: true
},
decoder => Membrane.Element.FFmpeg.H264.Decoder,
scissors => %Membrane.Scissors{
intervals:
Stream.iterate(0, &(&1 + Membrane.Time.milliseconds(10))) |> Stream.map(&{&1, 1}),
interval_duration_unit: :buffers,
buffer_duration: fn _buffer, _caps -> 100 end
},
thumbnailer => NovenMedia.Thumbnailer,
tee => Membrane.Element.Tee.Parallel,
video_payloader => Membrane.MP4.Payloader.H264,
video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: 10 |> Membrane.Time.seconds(),
storage: %NovenMedia.ETSStorage{table: state.table}
}
}
links = [
link(:rtp)
|> via_out(Pad.ref(:output, ssrc))
|> to(video_timestamper)
|> to(video_nal_parser)
|> to(video_payloader)
|> to(video_cmaf_muxer)
|> via_in(:input)
|> to(hls_encoder)
]
spec = %ParentSpec{children: children, links: links}
{{:ok, spec: spec}, state}
end
def handle_notification(_notification, _element, state) do
{:ok, state}
end
end
That's a lot of code, but let's step through it:
use Membrane.Pipeline
Probably the most important part. Defines all the code to implement a Pipeline
.
def child_spec([%Noven.Devices.Device{} = device, port]) do
via = {:via, Registry, {NovenMedia.NameRegistry, {"pipeline", device.id}}}
start_spec = {__MODULE__, :start_link, [[device, port], [name: via]]}
%{
id: {:pipeline, device.id},
start: start_spec
}
end
Simple child_spec implementation that tells OTP how to supervise this pipeline. Every Pipeline is just a GenServer under the hood, so calling start_link
will wire everything for us.
Processes are named via a Registry
.
def handle_init([device, port]) do
{:ok, _ref} = Noven.DevicePresence.track(self(), "devices", "#{device.id}", @default_presence)
table = :ets.new(:"stream-#{device.id}", [:public, :named_table])
children = %{
app_source: %Membrane.Element.UDP.Source{
local_port_no: port,
recv_buffer_size: 500_000
},
rtp: %Membrane.RTP.Session.ReceiveBin{
fmt_mapping: %{96 => :H264},
custom_depayloaders: %{
:H264 => Membrane.RTP.H264.Depayloader
}
}
}
links = [
link(:app_source) |> via_in(:input, buffer: [fail_size: 300]) |> to(:rtp)
]
spec = %ParentSpec{children: children, links: links}
{{:ok, spec: spec}, %{device: device, table: table}}
end
Analogous to Genserver.init/1
in the Membrane Framework.
Here we do a few things:
- Set up a presence to track devices via a frontend, telemetry, etc.
- Create an ETS table. This will be used as a temporary buffer to store h264 frames
- Define the
children
of the parent spec - Define the links between those children
- Return the initial state
app_source
is the UDP port that the application will listen on for RTP data. rtp
is a binary protocol that allows for real-time sending of data.
def handle_shutdown(_reason, state) do
if state.table, do: :ets.delete(state.table)
:ok
end
Clean up if the pipeline crashes.
def handle_notification({:new_rtp_stream, ssrc, :H264}, :rtp, state) do
{:ok, _ref} =
Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
pipeline: "play",
ssrc: to_string(ssrc)
})
video_timestamper = {:video_timestamper, make_ref()}
video_nal_parser = {:video_nal_parser, make_ref()}
decoder = {:decoder, make_ref()}
thumbnailer = {:thumbnailer, make_ref()}
tee = {:tee, make_ref()}
scissors = {:scissors, make_ref()}
video_payloader = {:video_payloader, make_ref()}
video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
hls_encoder = {:hls_encoder, make_ref()}
children = %{
# TODO: remove when moved to the RTP bin
video_timestamper => %Membrane.RTP.Timestamper{
resolution: Ratio.new(Time.second(), 90_000)
},
video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
framerate: {30, 1},
alignment: :au,
attach_nalus?: true
},
decoder => Membrane.Element.FFmpeg.H264.Decoder,
scissors => %Membrane.Scissors{
intervals:
Stream.iterate(0, &(&1 + Membrane.Time.milliseconds(10))) |> Stream.map(&{&1, 1}),
interval_duration_unit: :buffers,
buffer_duration: fn _buffer, _caps -> 100 end
},
thumbnailer => NovenMedia.Thumbnailer,
tee => Membrane.Element.Tee.Parallel,
video_payloader => Membrane.MP4.Payloader.H264,
video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: 10 |> Membrane.Time.seconds(),
storage: %NovenMedia.ETSStorage{table: state.table}
}
}
links = [
link(:rtp)
|> via_out(Pad.ref(:output, ssrc))
|> to(video_timestamper)
|> to(video_nal_parser)
|> to(video_payloader)
|> to(video_cmaf_muxer)
|> via_in(:input)
|> to(hls_encoder)
]
spec = %ParentSpec{children: children, links: links}
{{:ok, spec: spec}, state}
end
This is the bulk of the pipeline. It looks like a lot, but in reality, it's not so complicated and is just a lot of wiring. Let's walk through it.
{:ok, _ref} =
Noven.DevicePresence.update(self(), "devices", "#{state.device.id}", %{
pipeline: "play",
ssrc: to_string(ssrc)
})
Update the presence. Nothing that interesting here, except to note that ssrc
is an RTP
thing. It's basically an identifier for a stream.
video_timestamper = {:video_timestamper, make_ref()}
video_nal_parser = {:video_nal_parser, make_ref()}
decoder = {:decoder, make_ref()}
video_payloader = {:video_payloader, make_ref()}
video_cmaf_muxer = {:video_cmaf_muxer, make_ref()}
hls_encoder = {:hls_encoder, make_ref()}
Create references to all the stages in the pipeline. These names can be anything. We will see how they get used below.
children = %{
video_timestamper => %Membrane.RTP.Timestamper{
resolution: Ratio.new(Time.second(), 90_000)
},
video_nal_parser => %Membrane.Element.FFmpeg.H264.Parser{
framerate: {30, 1},
alignment: :au,
attach_nalus?: true
},
decoder => Membrane.Element.FFmpeg.H264.Decoder,
video_payloader => Membrane.MP4.Payloader.H264,
video_cmaf_muxer => Membrane.MP4.CMAF.Muxer,
hls_encoder => %Membrane.HTTPAdaptiveStream.Sink{
manifest_module: Membrane.HTTPAdaptiveStream.HLS,
target_window_duration: 10 |> Membrane.Time.seconds(),
storage: %NovenMedia.ETSStorage{table: state.table}
}
}
Similar to handle_init/1
, handle_notification/3
can spawn more children using the same API. This portion lists implementations for each of the references we created earlier. There is a lot here but the wiring portion will clear it up.
links = [
link(:rtp)
|> via_out(Pad.ref(:output, ssrc))
|> to(video_timestamper)
|> to(video_nal_parser)
|> to(video_payloader)
|> to(video_cmaf_muxer)
|> via_in(:input)
|> to(hls_encoder)
]
This essentially defines the route the data will flow through the pipeline after the RTP connection is created.
- New Stream (connection opened)
- Stream is
timestamped
- Stream is parsed into
NAL
units - Stream is decoded into
h264
payloads. - Stream is MUX-ed into MP4 format
- Stream is encoded as HLS
- Via the ETS storage module. I didn't include the implementation of this, but
it is essentially aread
and awrite
function.
Finally, we can start processing the "first mile" of our stream. We just need to go back into the device_channel
and do the final control functions:
@port 5000
def join(_topic, _params, socket) do
host = socket.endpoint.host()
socket.endpoint.subscribe("devices:#{socket.assigns.device.id}")
case NovenMedia.Supervisor.start_pipeline(socket.assigns.device, @port) do
{:ok, pid} ->
:ok = NovenMedia.Pipeline.play(pid)
{:ok, %{host: host, port: @port}, assign(socket, :pipeline_pid, pid)}
{:error, {:already_started, pid}} ->
:ok = NovenMedia.Pipeline.play(pid)
{:ok, %{host: host, port: @port}, assign(socket, :pipeline_pid, pid)}
end
end
def terminate(_, %{assigns: %{pipeline_pid: pid}}) do
NovenMedia.Pipeline.stop_and_terminate(pid, [])
end
def handle_info(%Broadcast{event: "play", payload: payload}, socket) do
push(socket, "play", payload)
{:noreply, socket}
end
def handle_info(%Broadcast{event: "stop", payload: payload}, socket) do
push(socket, "stop", payload)
{:noreply, socket}
end
This essentially starts our pipeline (using that child_spec
function - remember it?) when the device connects to the channel. Returning %{host: host, port: @port}
will send a message to the Nerves device on where to send its data. Control is handled by a Broadcast
events that can come from anywhere. In our case, it's via the Presence and a Live View Connection.
Of course, I'm omitting the UI code here, but the bulk of the work is handled by a LiveView that lists devices connected as indexed by the Presence
we created earlier.
Membrane ❤️ Nerves
For the device connection, I of course used Nerves to deploy Elixir code to a Raspberry Pi 3. I chose RPI3 because it has a built-in h264 encoder built into the CPU that is fairly easy to access via a program called gstreamer
.
mix nerves.new --target=rpi3 noven_link
The only dependencies we need here are:
{:phoenix_client, "~> 0.11"},
{:jason, "~> 1.0"},
for the Phoenix Channels connection.
Next, in application.ex
I added an implementation for connecting to the Socket with a token that is stored on the SD card.
def children(_target) do
# contains the token as a query param
url = Nerves.Runtime.KV.get("noven_url")
[
{PhoenixClient.Socket, {[url: url], [name: Socket]}},
{DeviceChannel, [socket: Socket]},
]
end
The next thing that is needed to be implemented is the device_channel
:
defmodule NovenLink.DeviceChannel do
alias PhoenixClient.Channel
require Logger
defmodule State do
defstruct params: %{},
socket: NovenLink.Socket,
topic: nil,
connected?: false,
channel: nil,
rejoin_after: 5000,
livestream: nil,
host: nil,
port: nil
end
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def terminate(reason, state) do
Logger.info("Stopping livestream")
if state.livestream && Process.alive?(state.livestream),
do: Process.exit(state.livestream, reason)
end
def init(opts) do
send(self(), :join)
rejoin_after = Application.get_env(:noven_link, :rejoin_after, 5_000)
uuid = System.unique_integer([:positive])
{:ok,
%State{
params: opts[:params],
socket: opts[:socket],
topic: "device:#{uuid}",
connected?: false,
rejoin_after: rejoin_after,
host: nil,
port: nil
}}
end
def handle_info(:join, %{socket: socket, topic: topic, params: params} = state) do
case Channel.join(socket, topic, params) do
{:ok, %{"host" => host, "port" => port}, channel} ->
Logger.info("Joined channel")
Process.send_after(self(), :checkup, 5000)
{:noreply, %{state | channel: channel, connected?: true, host: host, port: port}}
_error ->
Process.send_after(self(), :join, state.rejoin_after)
{:noreply, %{state | connected?: false}}
end
end
def handle_info(:checkup, state) do
if PhoenixClient.Socket.connected?(state.socket) do
Process.send_after(self(), :checkup, 5000)
{:noreply, state}
else
{:stop, :socket_disconnect, state}
end
end
def handle_info(
%PhoenixClient.Message{
event: "play",
payload: _
},
state
) do
Logger.info "Starting Livestream"
{:ok, livestream} = NovenLink.LiveStream.start_link(state.host, state.port)
{:noreply, %{state | livestream: livestream}}
end
def handle_info(
%PhoenixClient.Message{
event: "stop",
payload: _
},
state
) do
Logger.info "Stopping livestream"
state.livestream && GenServer.stop(state.livestream, :normal)
{:noreply, %{state | livestream: nil}}
end
end
This is a simple GenServer that connects to the DeviceChannel on the Web application. The important part for communication will be:
Channel.join(socket, topic, params)
Basically, that is what will start the Pipeline
on the Web app side. And the important part for the actual stream is:
def handle_info(
%PhoenixClient.Message{
event: "play",
payload: _
},
state
) do
Logger.info "Starting Livestream"
{:ok, livestream} = NovenLink.LiveStream.start_link(state.host, state.port)
{:noreply, %{state | livestream: livestream}}
end
This starts a linked GenServer: NovenLink.LiveStream
. We'll look at that implementation next:
defmodule NovenLink.LiveStream do
use GenServer
require Logger
def start_link(host, port) do
GenServer.start_link(__MODULE__, [host, port], name: __MODULE__)
end
@impl GenServer
def terminate(_reason, state) do
IO.puts "Livestream terminate: #{inspect(state.gst)}"
if state.gst, do: Process.exit(state.gst, :shutdown)
end
@impl GenServer
def init([host, port]) do
send(self(), :start_stream)
{:ok, %{host: to_charlist(host), port: port, gst: nil}}
end
@impl GenServer
def handle_info(:start_stream, state) do
Logger.info "Connecting to socket: #{state.host} #{state.port}"
gst = start_gst(state.host, state.port)
Process.link(gst)
{:noreply, %{state | gst: gst}}
end
defp start_gst(host, port) do
gst = System.find_executable("gst-launch-1.0")
args = [
"v4l2src",
"!",
"video/x-h264,",
"stream-format=byte-stream,",
"alignment=au,",
"width=640,",
"height=480,",
"pixel-aspect-ratio=1/1,",
"framerate=30/1",
"!",
"rtph264pay",
"pt=96",
"!",
"udpsink",
"host=#{host}",
"port=#{port}"
]
spawn(fn -> MuonTrap.cmd(gst, args, into: IO.stream(:stdio, :line)) end)
end
end
This is essentially a light wrapper using Muontrap which is part of the standard Nerves tooling. Basically, it contains an external process. The external process that we are wrapping here is called GStreamer, which is a battle-tested tool for media streaming and receiving media streams. We are using the udpsink
function to stream h264
data from v4l2src
. This is the magic that formats our UDP datagrams in a way that our Phoenix/Membrane stack will understand.
Conclusion
And that's it! After a bit of CSS and some Docker magic to get it on the internet, everything worked!
I obviously left a lot out here, so be sure to check out the source code.
Connor Rigby is a software engineer with a decade of experience in software development. He specializes in Elixir and Ruby, and is a core contributor to the Nerves and NervesHub projects.
Founded in 2007, Binary Noggin is a team of software engineers who serve as a trusted extension of your team, helping your company succeed through collaboration. We forge customizable solutions using Agile methodologies and our mastery of Elixir, Ruby and other open source technologies. Share your ideas with us on Facebook and Twitter.