Default Media Injector

The Default Media Injector module provides two different types of injectors:

  • A pass-through injector designed for streaming applications

  • A pacing injector designed for media file injecting applications

Both types of injectors provide connections to audio and video sinks for the media frames to be passed on to. Each of the injectors accepts audio frames and video frames.

The main difference between the two injectors is that the paced injector provides a pacing logic, so that it can queue audio and video frames before injecting them into a conference. The passthrough injector accepts the raw audio and video frames and directly relays them to the CoreSDK. Thus, the application is responsible for invoking the frame injection interface quickly.

class injector : public dolbyio::comms::video_frame_handler, public dolbyio::comms::video_source, public dolbyio::comms::audio_source, public std::enable_shared_from_this<injector>

The base abstract class for the Default Media Injector’s.

Subclassed by dolbyio::comms::plugin::injector_paced, dolbyio::comms::plugin::injector_passthrough

Public Types

using media_injection_status_cb = std::function<void(const media_injection_status&)>
using has_video_sink_cb = std::function<void(bool)>

Public Functions

injector(media_injection_status_cb &&status_cb)

Constructor for the injector base abstract class.

Parameters:

status_cb – Status callback function object, which the application should provide. The injector invokes the function object for any status changes it encounters and provide the media_injection_status object as a parameter. This callback is invoked on either of the injector’s media injection thread, or possibly on the injection sources thread. Either way the application should ensure this callback is thread-safe.

virtual ~injector()

The destructor of the injector.

void set_has_video_sink_cb(has_video_sink_cb &&cb)

Sets the callback notifying about the video sink presence.

The application may use this callback to start/stop injection, but can be configured manually, regardless of the injector’s video sink status.

Parameters:

cb

virtual bool inject_audio_frame(std::unique_ptr<audio_frame> &&frame) = 0

Inject an audio frame into the conference. The audio frame must contain 10ms of PCM data.

When this function returns, the audio_frame is be deleted and the audio data is memcpy’d in the WebRtcAudioSendStream::OnData function (which implements AudioSource::Sink::OnData).

Parameters:

frame – Audio frame containing raw PCM data.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

virtual bool inject_video_frame(const dolbyio::comms::video_frame &frame) = 0

Inject a raw video frame into the conference. The video frame must contain YUV pixels.

This function returns once the raw video frame has been placed onto the encoder_queue_ by VideoStreamEncoderOnFrame. The video frame is only actually free once the encoder is done with it.

Parameters:

frame – The video frame.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

Passthrough Injector

The Passthrough Injector acts as a proxy to the RTC Audio/Video sources. A Media Injection Source uses the audio and the video methods to pass frames to the Passthrough Injector, which in turn relays these frames to the CoreSDK.

class injector_passthrough : public dolbyio::comms::plugin::injector

The interface for the Default Passthrough Media Injector.

Public Functions

injector_passthrough(media_injection_status_cb &&status_cb)

Constructor for the passthrough injector.

Parameters:

status_cb – Status callback function object, which the application should provide. The injector invokes the function object for any status changes it encounters and provides the media_injection_status object as a parameter. This callback is invoked on either of the injector’s media injection thread, or possibly on the injection sources thread. Either way the application should ensure this callback is thread-safe.

~injector_passthrough() override

Destructor for passthrough injector.

virtual bool inject_audio_frame(std::unique_ptr<audio_frame> &&frame) override

Inject an audio frame into the conference. The audio frame must contain 10ms of PCM data.

This method directly passes the frames to WebRTC. The application is responsible for providing frames in steady 10ms intervals.

Parameters:

frame – Audio frame containing raw PCM data.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

virtual bool inject_video_frame(const video_frame &frame) override

Inject a raw video frame into the conference. The video frame must contain YUV pixels.

This method directly passes the frames to WebRTC. The application is responsible for delivering frames at a proper rate.

Parameters:

frame – The video frame.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

Paced Injector

The Paced Injector ensures that an application provides media frames to the CoreSDK at desired intervals. This is useful for applications that read from media files and produce frames quite quickly. The Paced Injector provides audio to the CoreSDK in intervals of 10ms and has a configurable option for the video frame interval that specifies how quickly video frames should be provided. The injector provides frames at these intervals by temporarily storing audio and video in queues. These queues have a limited size; they can store up to 1 second of audio and 300-400ms of video. When the queues are filled, the thread pushing to the queues is blocked until a free space is available. The Paced Injector also provides methods for managing the pace, such as starting and stopping the pacing threads, clearing the queues, and setting the video frame interval. If media file injection is paused by the injection source, silence should be injected into the conference to avoid AV sync issues when resuming the injection. The Paced Injector also provides a method for injecting silent audio frames in 10ms chunks into the conference.

class injector_paced : public dolbyio::comms::plugin::injector

The interface for the Default Paced Media Injector.

Public Functions

injector_paced(media_injection_status_cb &&status_cb)

Constructor for the passthrough injector.

Parameters:

status_cb – Status callback function object, which the application should provide. The injector invokes the function object for any status changes it encounters and provide the media_injection_status object as a parameter. This callback is invoked on either of the injector’s media injection thread, or possibly on the injection sources thread. Either way the application should ensure this callback is thread-safe.

~injector_paced() override

Destructor of the paced injector.

virtual bool inject_audio_frame(std::unique_ptr<audio_frame> &&frame) override

Inject an audio frame into the conference. The audio frame must contain 10ms of PCM data.

This method can be called more often than every 10ms. The audio frames are queued and provided to WebRTC every 10ms. The queue can hold up to one second of audio; once the queue fills, this function blocks the calling thread until space on the queue is available.

Parameters:

frame – Audio frame containing raw PCM data.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

virtual bool inject_video_frame(const dolbyio::comms::video_frame &frame) override

Inject a raw video frame into the conference. The video frame must contain YUV pixels.

The desired frame interval for injecting the frames into WebRTC conference should be configured via the set_video_frame_rate method. The video frames are queued and provided to WebRTC every frame interval. The queue can hold up to 10 video frames; once the queue fills this function blocks the calling thread until space on the queue is available.

Attention

This method blocks the calling thread when the video queue is full.

Parameters:

frame – The video frame.

Return values:
  • true – On successful push of frame.

  • false – On failure to push frame.

void set_video_frame_rate(int numerator, int denominator)

Sets the video frame rate in frames per second for injecting video frames into the conference as a numerator/denominator pair. The pacing injector provides frames to WebRTC at the interval specified by this method, provided the frames are available on the queue.

Parameters:
  • numerator – The numerator of the frame rate value.

  • denominator – The denominator of the frame rate value.

void start_video_injection()

Enables the video injection pacing thread for taking video frames from the queue and injecting them into the conference.

void stop_video_injection(bool force = true)

Stops the video injection pacing thread from taking video frames from the queue and injecting them into the conference. There are two options for stopping injection; the default option, which is considered forceful stoppage, stops the video pacing thread immediately. Frames that are on the queue when a force stop is invoked remain. The second option, which is considered graceful stoppage allows the caller to wait until all the frames on the queue are popped off by the video pacing thread, then stops the thread.

Parameters:

force – Boolean indicating whether to forcefully shutdown the video injection or wait for queue to become empty.

void start_audio_injection()

Enables the audio injection pacing thread for taking audio frames from the queue and injecting them into the conference.

void start_audio_silence_injection(int sample_rate, int channels)

Start injecting audio silence into the conference. This should be used if the injection source wants to pause the injection of media while not removing the audio track. In order to keep AV sync on point when the media injection is resumed, calling this function will start the audio injection pacing thread to provide WebRTC with silent audio frames. The silence injection is stopped the same way as regular injection via the stop_audio_injection method.

Parameters:
  • sample_rate – The sample rate of the audio stream which was being injected before being paused.

  • channels – The number of channels of the audio stream which was being injected before being paused.

void stop_audio_injection(bool force = true)

Stops the audio injection pacing thread from taking audio frames from the queue and injecting them into the conference. There are two options for stopping injection; the default option, which is considered forceful stoppage, stops the audio pacing thread immediately. Frames that are on the queue when a force stop is invoked remain. The second option, which is considered graceful stoppage allows the caller to wait until all the frames on the queue are popped off by the audio pacing thread, then stops the thread.

Parameters:

force – Boolean indicating whether to forcefully shutdown the audio injection or wait for queue to be cleared.

void clear_audio_queue()

Clears the audio queue that is holding audio frames awaiting their injection into conference. This method unblocks any threads that are blocked with the inject_audio_frame method.

void clear_video_queue()

Clears the video queue that is holding video frames awaiting their injection into conference. This method unblocks any threads that are blocked with the inject_video_frame method.

Media Injection Status

The Media Injection Status is a structure that depicts the status of the injector. It is passed to the application when the status changes.

struct media_injection_status

The current media injection status is expressed via this structure. This structure can be passed to the application by the injector to describe the current status.

Public Types

enum state

Enum describing the state of the media.

Values:

enumerator STOPPED
enumerator INJECTING
enumerator ERROR
enum type

The type of media this status is for.

Values:

enumerator AUDIO
enumerator VIDEO

Public Functions

media_injection_status(type type)

Constructor taking only the media type.

Parameters:

type – Enum value describing the type of media.

media_injection_status(type type, state state, const std::string &description = {})

Explicit constructor taking the type, state and extra description.

Parameters:
  • type – Enum value describing the type of media.

  • state – Enum value describing the state.

  • description – Extra description of current media state.

~media_injection_status()

Default destructor.

Public Members

type type_

Holds the value of the type of media.

state state_ = {STOPPED}

Holds the value of current state for single media.

std::string description_ = {}

Holds the value of any extra description of current state for single media.

Media Source for Injector

Neither of the default injectors provide infrastructure for capturing and decoding media. This is the application’s responsibility; once it has decoded media frames, it can pass them to either of the aforementioned injectors. We have provided a sample library to showcase how to create a Media Injection Source. This sample library, called Media File Source Sample, reads media from a mov/mp4 file and using the Paced Injector injects the audio and video frames into the CoreSDK. This Media File Source Sample library is utilized by the provided sample application to inject media files. The Sample application page describes how to get media injection running with the sample application. A brief overview of the library is provided below.

The audio provided to the injector, contained by the audio frame, should have the following specifications:

Codec

Size

Format

PCM

10ms of data

s16_Int

The CoreSDK expects these 10ms chunks of PCM audio to be provided every 10ms. The video frames provided to the Injector, contained in video frames, must be YUV pixels.

Media File Source Sample

The Media Source File sample library is a simple library that makes use of LibAV to read, demux, and decode audio and video frames. The top-level entity of this class is dolbyio::comms::sample::file_source that takes an instance of the Paced Injector and creates a capture thread that parses and decodes media. When a media frame is decoded, this entity pushes the frame to the Paced Injector.

The Media File Source sample library provides the ability to parse the following types of media files:

Container

MP4

MOV

The Media File Source sample is able to decode the following video and audio codecs:

Codec

Type

Sample/Pixel Format

AAC

Audio

float, planar

H264

Video

YUV

The library is included along with proper CMake file in the package under share/dolbyio/comms/sample/media_source/ directory. A sample of the top-level object of the Media Source File library is in the following files:

#pragma once

/***************************************************************************
 * This program is licensed by the accompanying "license" file. This file is
 * distributed "AS IS" AND WITHOUT WARRANTY OF ANY KIND WHATSOEVER, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 *                Copyright (C) 2022 by Dolby Laboratories.
 ***************************************************************************/

#include <dolbyio/comms/multimedia_streaming/injector.h>

#include "comms/sample/media_source/file/libav_wrapper/avcontext.h"
#include "comms/sample/media_source/file/source_context.h"
#include "comms/sample/media_source/file/utils/audio_buffer.h"
#include "comms/sample/media_source/file/utils/frame_pool.h"
#include "comms/sample/media_source/file/utils/media_frame.h"

#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

namespace dolbyio::comms::sample {

enum class source_state { STOPPED, RESTARTED, CONTINUE, RESUMED, PAUSED };

struct file_source_status {
  bool capturing_audio = false;
  bool capturing_video = false;
  source_state current_state = source_state::STOPPED;
};

using audio_pool_frame_ptr = std::unique_ptr<frame_from_pool<audio_buffer>>;
using video_pool_frame_ptr = std::unique_ptr<frame_from_pool<frame>>;

class file_source {
 public:
  static std::unique_ptr<file_source> create(
      std::vector<std::string>&& files,
      bool loop,
      dolbyio::comms::plugin::injector_paced& injection,
      std::function<void(const file_source_status& status)>&& status_cb);

  file_source(std::vector<std::string>&& files,
              bool loop,
              comms::plugin::injector_paced& injector,
              std::function<void(const file_source_status&)>&& status_cb);
  ~file_source();

  // Public Interface
  bool set_audio_capture(bool enable);
  bool set_video_capture(bool enable);
  bool seek(int64_t time);
  bool pause();
  bool resume();
  bool loop_current_file(bool enables);
  void play_new_file(const std::string& file);
  void add_file_playlist(const std::string& file);

 private:
  bool start_capture();
  bool stop_capture(std::unique_lock<std::mutex>& lock);
  bool restart_capture(bool init_context);
  bool initialize_av_context();
  bool playlist_not_finished();
  bool wait_thread_stopped(std::unique_lock<std::mutex>& lock);

  int queue_audio_frame(audio_pool_frame_ptr&& value);
  int queue_video_frame(video_pool_frame_ptr&& value);
  void allocate_audio_frame_pool();
  audio_pool_frame_ptr process_audio(audio_pool_frame_ptr&& curr_buff,
                                     frame& aframe);

  // Managing the capturing thread
  void start_thread();
  void thread_function();
  void capture_loop();
  void capture_loop_exited();

  dolbyio::comms::plugin::injector_paced& injector_;
  std::unique_ptr<frame_pool<audio_buffer>> audio_pool_{};
  std::unique_ptr<frame_pool<frame>> video_pool_{};
  std::unique_ptr<libav_context> libav_context_;

  file_state file_state_;
  std::vector<std::string> input_files_;
  std::vector<std::string>::iterator curr_file_;

  struct capture_state {
    bool capture_audio = false;
    bool capture_video = false;
    bool running = false;
    bool running_silence = false;
    bool looping = false;
  };
  capture_state capture_state_;

  struct thread_state {
    bool start_ = false;
    bool exit_ = false;
    bool waiting_ = false;
    bool stopped_ = false;
  };
  thread_state thread_state_;

  std::mutex capture_lock_;
  std::thread capture_thread_;
  std::condition_variable wait_to_start_;
  std::condition_variable wait_to_stop_;
  std::function<void()> capture_executor_;

  std::function<void(const file_source_status& status)> source_status_;
};

};  // namespace dolbyio::comms::sample
/***************************************************************************
 * This program is licensed by the accompanying "license" file. This file is
 * distributed "AS IS" AND WITHOUT WARRANTY OF ANY KIND WHATSOEVER, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 *
 *                Copyright (C) 2022 by Dolby Laboratories.
 ***************************************************************************/

#include "comms/sample/media_source/file/source_capture.h"

namespace dolbyio::comms::sample {

std::unique_ptr<file_source> file_source::create(
    std::vector<std::string>&& files,
    bool loop,
    dolbyio::comms::plugin::injector_paced& injector,
    std::function<void(const file_source_status& status)>&& status_cb) {
  return std::make_unique<file_source>(std::move(files), loop, injector,
                                       std::move(status_cb));
}

file_source::file_source(
    std::vector<std::string>&& files,
    bool loop,
    dolbyio::comms::plugin::injector_paced& injector,
    std::function<void(const file_source_status&)>&& status_cb)
    : injector_(injector),
      video_pool_(std::make_unique<frame_pool<frame>>(
          20,
          []() { return (new frame()); },
          [](frame* f) { delete f; })),
      input_files_(std::move(files)),
      curr_file_(input_files_.begin()),
      capture_thread_([this]() {
#if defined(__APPLE__)
        pthread_setname_np("injection_capture");
#elif defined(__linux__)
        pthread_setname_np(capture_thread_.native_handle(),
                           "injection_capture");
#endif
        thread_function();
      }),
      capture_executor_([this]() { return capture_loop(); }),
      source_status_(std::move(status_cb)) {
  if (curr_file_ != input_files_.end())
    file_state_.new_file(*curr_file_);
  else
    std::cerr << "Warning: no media file provided, you must provide one before "
                 "starting injection!\n";

  capture_state_.looping = loop;
}

file_source::~file_source() {
  {
    std::unique_lock<std::mutex> lock(capture_lock_);
    if (capture_state_.running) {
      capture_state_.capture_audio = false;
      capture_state_.capture_video = false;
      stop_capture(lock);
    } else if (capture_state_.running_silence) {
      injector_.stop_audio_injection(true);
    }
    injector_.clear_audio_queue();
    injector_.clear_video_queue();
    thread_state_.exit_ = true;
  }
  wait_to_start_.notify_one();
  try {
    capture_thread_.join();
  } catch (const std::exception& e) {
    std::cerr << "Error: Joining capture_thread threw: " << e.what()
              << std::endl;
  }
  audio_pool_.reset();
  video_pool_.reset();
  libav_context_.reset();
}

bool file_source::set_audio_capture(bool enable) {
  std::unique_lock<std::mutex> lock(capture_lock_);
  if (capture_state_.capture_audio == enable) {
    std::cerr << "Audio state already set to "
              << (enable ? "Enable" : "Disable") << "!\n";
    return false;
  }
  capture_state_.capture_audio = enable;
  std::cerr << "Audio set to " << (enable ? "" : "Stop ") << "Capture!\n";

  if (enable) {
    injector_.start_audio_injection();
    return start_capture();
  } else
    return stop_capture(lock);
}

bool file_source::set_video_capture(bool enable) {
  std::unique_lock<std::mutex> lock(capture_lock_);
  if (capture_state_.capture_video == enable) {
    std::cerr << "Video state already set to "
              << (enable ? "Enable" : "Disable") << "!\n";
    return false;
  }
  capture_state_.capture_video = enable;
  std::cerr << "Video set to " << (enable ? "" : "Stop ") << "Capture!\n";

  if (enable) {
    injector_.start_video_injection();
    return start_capture();
  } else
    return stop_capture(lock);
}

bool file_source::seek(int64_t time) {
  std::lock_guard<std::mutex> lock(capture_lock_);
  if (!capture_state_.running || !libav_context_->set_next_seek_time(time)) {
    std::cerr << "Failed to Seek, Capture State "
              << (capture_state_.running ? "" : "Not") << " Running!\n";
    return false;
  }

  file_state_.seek();
  return true;
}

bool file_source::pause() {
  std::unique_lock<std::mutex> lock(capture_lock_);
  if (!capture_state_.running) {
    std::cerr << "Pause Failed, Capture is Not Running!\n";
    return false;
  }

  file_state_.pause();
  if (wait_thread_stopped(lock)) {
    capture_state_.running_silence = true;
    injector_.start_audio_silence_injection(file_state_.audio.sample_rate_,
                                            file_state_.audio.channels_);
    return true;
  }
  return false;
}

bool file_source::resume() {
  std::lock_guard<std::mutex> lock(capture_lock_);
  if (file_state_.state() != file_state::PAUSE) {
    std::cerr << "Resume Failed, Current state is not Paused!\n";
    return false;
  }

  // Stop injecting silence into the conference
  injector_.stop_audio_injection(true /*force stoppage*/);

  if (capture_state_.capture_audio)
    injector_.start_audio_injection();
  if (capture_state_.capture_video)
    injector_.start_video_injection();

  capture_state_.running_silence = false;
  capture_state_.running = true;

  start_thread();
  return true;
}

void file_source::play_new_file(const std::string& file) {
  std::lock_guard<std::mutex> lock(capture_lock_);
  auto iter = input_files_.begin();
  // Is our playlist empty
  if (iter == input_files_.end()) {
    input_files_.push_back(file);
    curr_file_ = input_files_.begin();
  } else {
    for (; iter != input_files_.end(); ++iter) {
      if (*iter == file)
        break;
    }
    // Check if the new file to play is already part of the playlist
    if (iter != input_files_.end())
      curr_file_ = iter;
    // If it is not check if adding a new file will invalidate
    else if (input_files_.size() < input_files_.capacity()) {
      input_files_.push_back(file);
      auto last_iter = std::prev(input_files_.end());
      std::iter_swap(curr_file_, last_iter);
    } else {
      std::string current = *curr_file_;
      input_files_.push_back(file);
      auto last_iter = std::prev(input_files_.end());
      auto curr_iter = input_files_.begin();
      for (; curr_iter != input_files_.end(); ++curr_iter) {
        if (*curr_iter == current)
          break;
      }
      std::iter_swap(curr_iter, last_iter);
      curr_file_ = curr_iter;
    }
  }
  file_state_.new_file(*curr_file_);
}

void file_source::add_file_playlist(const std::string& file) {
  std::lock_guard<std::mutex> lock(capture_lock_);
  auto iter = input_files_.begin();
  // Check if the playlist is empty
  if (iter == input_files_.end()) {
    input_files_.push_back(file);
    curr_file_ = input_files_.begin();
    return;
  }
  for (; iter != input_files_.end(); ++iter) {
    if (*iter == file)
      break;
  }
  // Check if the file is already part of the playlist
  if (iter != input_files_.end())
    return;
  else if (input_files_.size() < input_files_.capacity())
    input_files_.push_back(file);
  else {
    std::string current = *curr_file_;
    input_files_.push_back(file);
    for (iter = input_files_.begin(); iter != input_files_.end(); ++iter) {
      if (*iter == current) {
        curr_file_ = iter;
        break;
      }
    }
  }
}

bool file_source::start_capture() {
  if (capture_state_.running)
    return true;

  auto state = file_state_.state();
  if (state == file_state::STOP || state == file_state::NEW) {
    if (!initialize_av_context())
      return false;
  } else if (state != file_state::PAUSE)
    return false;

  capture_state_.running = true;

  start_thread();
  return true;
}

bool file_source::stop_capture(std::unique_lock<std::mutex>& lock) {
  if (!capture_state_.running) {
    std::cerr << "Capture from media file is already stopped!\n";
    return true;
  }
  if (capture_state_.capture_audio || capture_state_.capture_video) {
    std::cerr << (capture_state_.capture_audio ? "Still Capturing Audio! "
                                               : "Not Capturing Audio! ")
              << (capture_state_.capture_video ? "Still Capturing Video!"
                                               : "Not Capturing Video!")
              << std::endl;
    return false;
  }
  // Stop and wait for the thread to reach conditional in thread_function
  file_state_.stop();
  return wait_thread_stopped(lock);
}

bool file_source::restart_capture(bool init_context) {
  if (init_context && !initialize_av_context())
    return false;

  capture_state_.running = true;
  injector_.start_video_injection();
  injector_.start_audio_injection();

  return true;
}

bool file_source::playlist_not_finished() {
  return std::next(curr_file_) != input_files_.end();
}

bool file_source::wait_thread_stopped(std::unique_lock<std::mutex>& lock) {
  if (!lock.owns_lock())
    return false;

  thread_state_.waiting_ = true;
  wait_to_stop_.wait(lock, [this]() { return thread_state_.stopped_; });
  thread_state_.waiting_ = false;
  return true;
}

bool file_source::initialize_av_context() {
  try {
    if (file_state_.name().empty())
      throw std::runtime_error("There is no media file set to read from!");

    libav_context_ = std::make_unique<libav_context>(file_state_.name());
    libav_context_->create_decoder(AVMEDIA_TYPE_VIDEO);
    libav_context_->create_decoder(AVMEDIA_TYPE_AUDIO);
  } catch (const std::exception& ex) {
    libav_context_.reset();
    std::cerr << "Failed for reason: " << ex.what() << std::endl;
    return false;
  }

  // If the format of the audio in file is different we need a new audio frame
  // pool.
  if (!file_state_.audio.compare(libav_context_->sample_rate(),
                                 libav_context_->channels()))
    allocate_audio_frame_pool();

  file_state_.audio.settings(libav_context_->sample_rate(),
                             libav_context_->channels());
  auto rate = libav_context_->frame_rate();
  std::cerr << "Frame rate num: " << rate.num << " den: " << rate.den
            << std::endl;
  injector_.set_video_frame_rate(rate.num, rate.den);
  return true;
}

int file_source::queue_audio_frame(audio_pool_frame_ptr&& value) {
  return injector_.inject_audio_frame(
      std::make_unique<audio_frame_impl>(std::move(value)));
}

int file_source::queue_video_frame(video_pool_frame_ptr&& value) {
  return injector_.inject_video_frame(video_frame{
      std::make_shared<i420_video_frame_buffer>(std::move(value)), 0});
}

void file_source::allocate_audio_frame_pool() {
  audio_pool_ = std::make_unique<frame_pool<audio_buffer>>(
      100,
      [samples{libav_context_->sample_rate() / 100},
       channels{libav_context_->channels()},
       sample_rate{libav_context_->sample_rate()}]() -> audio_buffer* {
        return new audio_buffer(samples, sample_rate, channels);
      },
      std::default_delete<audio_buffer>());
}

// right now only f32lpp, handle other input formats for audio
audio_pool_frame_ptr file_source::process_audio(
    audio_pool_frame_ptr&& curr_buff,
    frame& aframe) {
  if (!curr_buff) {
    std::cerr << "Current audio buffer is null!\n";
    return nullptr;
  }
  if (!aframe) {
    std::cerr << "No AVFrame provided!\n";
    return std::move(curr_buff);
  }
  if (aframe->format != AV_SAMPLE_FMT_FLTP) {
    std::cerr << "Currently only support AVFrame format Float Planar!\n";
    return std::move(curr_buff);
  }

  float* channel_buffer[AV_NUM_DATA_POINTERS] = {0};
  for (int i = 0; i < aframe->ch_layout.nb_channels; ++i)
    channel_buffer[i] = reinterpret_cast<float*>(aframe->data[i]);

  for (int i = 0; i < aframe->nb_samples; ++i) {
    if (curr_buff->val()->full()) {
      queue_audio_frame(std::move(curr_buff));
      curr_buff.reset(new frame_from_pool<audio_buffer>(
          audio_pool_->get_frame(), *audio_pool_,
          [](audio_buffer* buf) { buf->reset(); }));
    }
    for (int j = 0; j < aframe->ch_layout.nb_channels; ++j) {
      double val = channel_buffer[j][i] * std::numeric_limits<int16_t>::max();
      curr_buff->val()->push(val);
    }
  }
  if (curr_buff->val()->full()) {
    queue_audio_frame(std::move(curr_buff));
    curr_buff.reset(new frame_from_pool<audio_buffer>(
        audio_pool_->get_frame(), *audio_pool_,
        [](audio_buffer* buf) { buf->reset(); }));
  }
  return std::move(curr_buff);
}

void file_source::thread_function() {
  while (!thread_state_.exit_) {
    {
      std::unique_lock<std::mutex> lock(capture_lock_);

      // Check if someone is waiting to be sure the thread has returned
      // to the stopped state.
      thread_state_.stopped_ = true;
      if (thread_state_.waiting_)
        wait_to_stop_.notify_one();

      wait_to_start_.wait(lock, [this]() {
        return thread_state_.start_ || thread_state_.exit_;
      });

      thread_state_.stopped_ = false;
      if (thread_state_.exit_)
        break;
    }
    capture_executor_();
  }
}

void file_source::start_thread() {
  thread_state_.start_ = true;
  wait_to_start_.notify_one();
}

/* This is called on the capture_thread_ after it exits the loop. We check the
 * current status of the file and act accordingly.
 */
void file_source::capture_loop_exited() {
  std::unique_lock<std::mutex> lock(capture_lock_);
  capture_state_.running = false;
  bool restart = false;

  // If the capture loop exited on it's own check if we need to loop single
  // file, play next file in play list or just exit.
  auto state = file_state_.state();
  if (state == file_state::PLAYING) {
    // If the file left capture loop in play state it means there are still
    // like frames on the queue, make sure to let them all be inejcted before
    // continuing.
    injector_.stop_audio_injection(false /*wait all frames to be injected*/);
    injector_.stop_video_injection(false /*wait all frames to be injected*/);

    if (capture_state_.looping)
      restart = restart_capture(true);
    else if (playlist_not_finished()) {
      // Go to the next file and set it to start playing once we enter the
      // capture loop
      ++curr_file_;
      file_state_.new_file(*curr_file_);
      restart = restart_capture(true);
    } else {
      file_state_.stop();
      libav_context_.reset();
      capture_state_.capture_audio = false;
      capture_state_.capture_video = false;

      if (source_status_)
        source_status_(file_source_status{false, false, source_state::STOPPED});
    }
  } else {
    // If file is no longer in the play state stop the injection threads and
    // then act accordingly based on the file state
    injector_.stop_audio_injection(true /*force stoppage*/);
    injector_.stop_video_injection(true /*force stoppage*/);

    if (state == file_state::SEEK) {
      libav_context_->seek_set_time();
      restart = restart_capture(false);
    } else if (state == file_state::NEW) {
      injector_.clear_audio_queue();
      injector_.clear_video_queue();
      restart = restart_capture(true);
    } else if (state == file_state::STOP) {
      injector_.clear_audio_queue();
      injector_.clear_video_queue();
      libav_context_.reset();
    }
  }
  thread_state_.start_ = restart;
}

void file_source::capture_loop() {
  {
    audio_pool_frame_ptr reference_audio_frame(
        new frame_from_pool<audio_buffer>(
            audio_pool_->get_frame(), *audio_pool_,
            [](audio_buffer* buf) { buf->reset(); }));

    auto audio_read_frame = std::make_unique<frame>();
    file_state_.playing();
    int ret = 0;
    while (libav_context_->read_single_packet() >= 0) {
      {
        // Check if the state of the file has changed externally
        std::lock_guard<std::mutex> lock(capture_lock_);
        file_state::state_change state = file_state_.state();
        if (state != file_state::PLAYING) {
          if (state != file_state::PAUSE)
            libav_context_->packet_finished();
          break;
        }
        ret = libav_context_->packet_to_decoder(capture_state_.capture_video,
                                                capture_state_.capture_audio);
      }
      while (ret >= 0) {
        if (libav_context_->is_video()) {
          auto* vframe = video_pool_->get_frame();
          ret = libav_context_->frame_from_decoder<video>(vframe);
          if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
            video_pool_->return_frame(vframe);
            break;
          }
          if (vframe->raw()->format != AV_PIX_FMT_YUV420P) {
            video_pool_->return_frame(vframe);
            std::cerr << "Bad video frame format" << std::endl;
            break;
          }
          queue_video_frame(std::make_unique<frame_from_pool<frame>>(
              vframe, *video_pool_, [](frame* f) { f->unref(); }));
        } else if (libav_context_->is_audio()) {
          ret =
              libav_context_->frame_from_decoder<audio>(audio_read_frame.get());
          if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF)
            break;

          // this will return the current audio frame that contains the packets
          // which are left over from last fillage
          reference_audio_frame = process_audio(
              std::move(reference_audio_frame), *audio_read_frame);
          audio_read_frame->unref();
        }
      }
      libav_context_->packet_finished();
    }
  }
  // All capture_loop stack variables deleted before checking reason for exit
  capture_loop_exited();
}

};  // namespace dolbyio::comms::sample