// SYSTEM INCLUDES #include #include #include // APPLICATION INCLUDES #include "RtpWorker.h" #include "IODeviceSource.h" #include "IODeviceSink.h" // EXTERNAL FUNCTIONS // EXTERNAL VARIABLES // CONSTANTS constexpr uint8_t gRTPFirstHeader[] = { // sequence #1 - marker on 0x80, 0x88, 0x00, 0x01, 0x00, 0x00, 0x00, 0xa0, 0xd2, 0xbd, 0x4e, 0x3e }; constexpr uint8_t gRTPFirstPayload[] = { 0xdc, 0xde, 0xc4, 0xc5, 0xdc, 0xd0, 0xd5, 0x51, 0x53, 0x5d, 0x5f, 0x5b, 0x46, 0x46, 0x46, 0x5b, 0x44, 0x41, 0x42, 0x4f, 0x42, 0x47, 0x42, 0x43, 0x59, 0x58, 0x59, 0x5f, 0x5f, 0x52, 0x59, 0x44, 0x44, 0x5f, 0x51, 0x54, 0x55, 0x55, 0x51, 0x56, 0x50, 0x52, 0x5e, 0x58, 0x5d, 0x52, 0x52, 0x50, 0x57, 0x54, 0xd4, 0xd6, 0xd5, 0x51, 0x53, 0x57, 0xd6, 0xd6, 0xd0, 0xd7, 0x57, 0x56, 0x57, 0xd0, 0xd3, 0xd6, 0xd5, 0x55, 0x51, 0x50, 0xd6, 0xdf, 0xd2, 0xd1, 0xd4, 0xd6, 0xdc, 0xdb, 0xda, 0xdd, 0xd6, 0x55, 0xdc, 0xd0, 0xd4, 0x5d, 0x44, 0x5c, 0x56, 0xd6, 0xd5, 0xd4, 0xd5, 0xd7, 0x50, 0xd4, 0x51, 0xd0, 0x61, 0x6f, 0x76, 0xfe, 0xef, 0xf7, 0x77, 0x66, 0x50, 0xff, 0xe5, 0xd7, 0x74, 0x4a, 0xc9, 0xf9, 0xf7, 0x5c, 0x76, 0x5f, 0xf5, 0xf3, 0xdd, 0x4e, 0x42, 0xd8, 0xf7, 0xc9, 0x50, 0x44, 0x50, 0xcd, 0xc9, 0xd4, 0x4d, 0x41, 0x57, 0xd1, 0x51, 0x58, 0x44, 0x52, 0xd3, 0xd1, 0x50, 0x58, 0x5b, 0x55, 0xd4, 0x53, 0x59, 0x43, 0x47, 0x5f, 0x51, 0x5d, 0x56, 0xd2, 0xde, 0xd7, 0x52, 0xd5 }; constexpr uint8_t gRTPSecondHeader[] = { // sequence #2 - marker off 0x80, 0x08, 0x00, 0x02, 0x00, 0x00, 0x01, 0x40, 0xd2, 0xbd, 0x4e, 0x3e }; constexpr uint8_t gRTPSecondPayload[] = { 0xd2, 0xc7, 0xc5, 0xd3, 0xd1, 0xd6, 0xd5, 0x55, 0xd2, 0xd2, 0xd3, 0xdf, 0xc4, 0xd9, 0xdb, 0xc6, 0xc6, 0xc9, 0xc1, 0xf5, 0xcf, 0xf0, 0xcb, 0xe4, 0x7d, 0x14, 0x7a, 0xfd, 0xfd, 0x62, 0x63, 0x41, 0xee, 0xe7, 0x40, 0x6c, 0x46, 0xf8, 0xf3, 0x74, 0x7b, 0x58, 0xf3, 0xf1, 0xd6, 0x5d, 0xc7, 0xfd, 0xff, 0xf5, 0x55, 0xd9, 0xcb, 0xcb, 0xde, 0xd1, 0xda, 0xca, 0xcd, 0xc5, 0xc4, 0x41, 0x72, 0x41, 0x55, 0xda, 0x54, 0x5c, 0x40, 0x59, 0xd1, 0x5f, 0x4b, 0x40, 0x51, 0x52, 0x45, 0x4b, 0x77, 0x49, 0x4e, 0x4e, 0x48, 0x4a, 0x75, 0x47, 0x75, 0x7b, 0x72, 0x42, 0x58, 0x4c, 0x46, 0xd5, 0xc2, 0xd1, 0x5d, 0x44, 0x50, 0xd0, 0x53, 0x43, 0x41, 0x52, 0x53, 0x5a, 0x44, 0x50, 0xdd, 0x45, 0x65, 0x7d, 0x74, 0x4c, 0x70, 0x43, 0x53, 0xc4, 0xc5, 0x5b, 0x5a, 0x5c, 0x51, 0x4f, 0x70, 0x4a, 0x58, 0x5e, 0x75, 0x76, 0x40, 0x57, 0x51, 0x46, 0x5e, 0x5c, 0xd5, 0x5c, 0x5c, 0x5e, 0x5e, 0x4d, 0x41, 0x44, 0xd4, 0xd6, 0xd5, 0x52, 0x54, 0xdd, 0xd7, 0xd3, 0x56, 0x51, 0x55, 0xdf, 0x54, 0xd1, 0xd2, 0xda }; // NAMESPACE USAGE // STATIC VARIABLE INITIALIZATIONS //#define RESAMPLER_BUG_FIXED 1 //! Constructor RtpWorker::RtpWorker() : mpThread{ std::make_unique() } //!< must not pass parent , mpTimer{ std::make_unique() } , mpWatchdog{ std::make_unique(this) } //!< 'this' parent for thread affinity , mpSocket{ std::make_unique(this) } //!< 'this' parent for thread affinity #if defined (RESAMPLER_BUG_FIXED) // "qt.multimedia.audiooutput: Failed to setup resampler" on console , mpAudioSource{ std::make_unique( QMediaDevices::defaultAudioInput(), QMediaDevices::defaultAudioInput(). preferredFormat(), this) } , mpIODeviceSource{ std::make_unique( QMediaDevices::defaultAudioInput(). preferredFormat(), this) } #else // cannot use either of these objects as instantiating them prevents the resampler from working , mpAudioSource{} , mpIODeviceSource{} #endif #if defined (QAUDIOSINK_BUG_FIXED) , mpAudioSink{ std::make_unique( QMediaDevices::defaultAudioOutput(), QMediaDevices::defaultAudioOutput(). preferredFormat(), this) } #else , mpAudioSink{} #endif , mpIODeviceSink{ std::make_unique( QByteArrayList{}, this) } , mTXBufferList{} , mTargetIP{} , mTargetPort{} , mTSIncrement{} , mTXCounter{0} { connect(this, &RtpWorker::start, this, &RtpWorker::handleStart); connect(this, &RtpWorker::stop, this, &RtpWorker::handleStop); connect(this, &RtpWorker::updateAudio, this, &RtpWorker::handleAudioUpdated); connect(mpSocket.get(), &QUdpSocket::readyRead, this, &RtpWorker::readyRead); connect(mpSocket.get(), &QUdpSocket::bytesWritten, this, &RtpWorker::handleBytesWritten); connect(mpWatchdog.get(), &QTimer::timeout, [&]() { Q_EMIT addLogEntry(spdlog::level::warn, "RTP network disconnected"); Q_EMIT watchdogStatusChanged(true, "RTP network disconnected"); }); #if defined (RESAMPLER_BUG_FIXED) connect(mpIODeviceSource.get(), &IODeviceSource::audioCaptured, this, &RtpWorker::handleAudioCaptured); #endif #if defined (QAUDIOSINK_BUG_FIXED) connect(mpAudioSink.get(), &QAudioSink::stateChanged, [&](QAudio::State state) { // update log tab Q_EMIT addLogEntry(spdlog::level::info, "QAudioSink::stateChanged"); }); #endif // thread affinity magic moveToThread(mpThread.get()); mpThread->start(QThread::TimeCriticalPriority); } /** * Start signal handler. * *

This slot runs in the worker thread context. Creates a * buffered list of transmit UDP datagrams. Each datagram is prefixed * with an RTP header (including a sequence# & timestamp). * *

The first datagram will always have the marker bit set. * * @param aTSIncrement [in] Timestamp increment per datagram. * @param rTargetIP [in] Target IP address. * @param aTargetPort [in] Target UDP port. * @param aDurationUsecs * [in] Duration (specified in microseconds) to * sleep between recurring timer callbacks. * This time represents the duration for all * mTXBufferList datagrams. * @param rHeader [in] RTP header consisting of a QByteArray * containing an rtp_hdr_t with optional contributing * sources (up to 16) & rtp_hdr_ext_t fields & data if * the x bit of the rtp_hdr_t is set. * @param rAudioData [in] raw multichannel audio. */ void RtpWorker::handleStart( const qint64 aTSIncrement, const QString& rTargetIP, const quint16 aTargetPort, const qint64 aDurationUsecs, const QByteArray& rHeader, const QByteArrayList& rAudioData) { mTXCounter = 0; mTSIncrement = aTSIncrement; mTargetIP = QHostAddress(rTargetIP); mTargetPort = aTargetPort; mTXBufferList.clear(); if (mTargetIP.isMulticast()) { mpSocket->bind(QHostAddress( QHostAddress::AnyIPv4), 0); } const auto headerLen = gHeaderLenLambda(rHeader); if (headerLen >= sizeof(rtp_hdr_t)) { // copy construct the existing header const auto rawHeader = std::make_unique(rHeader); const auto rtpHeader = reinterpret_cast(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum(); auto timeStamp = rtpHeader->getTimestamp(); for (const auto& next : rAudioData) { // set the marker indicating a significant boundary condition if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions enclosed via headerLen) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } #if !defined (QAUDIOSINK_BUG_FIXED) // note no need to pass 'this' as as default thread affinity // is to the current thread which is the worker thread. const auto& defaultOutput = QMediaDevices::defaultAudioOutput(); const auto formats = defaultOutput.supportedSampleFormats(); // change preferredFormat() for best match between rtp header // format and one of the above formats mpAudioSink = std::make_unique( defaultOutput, defaultOutput.preferredFormat()); #endif // copy the synthesized data to the device, it will be sent to the speaker. mpIODeviceSink->setData(rAudioData.join()); // opens the device for read only mpIODeviceSink->start(); // starts streaming using the pull service mpAudioSink->start(mpIODeviceSink.get()); #if !defined (RESAMPLER_BUG_FIXED) mpAudioSource = std::make_unique( QMediaDevices::defaultAudioInput(), QMediaDevices::defaultAudioInput(). preferredFormat()); mpIODeviceSource = std::make_unique( QMediaDevices::defaultAudioInput(). preferredFormat()); // @JC use unique connection so as not to leak connections connect(mpIODeviceSource.get(), &IODeviceSource::audioCaptured, this, &RtpWorker::handleAudioCaptured, Qt::UniqueConnection); #endif // opens the device for write by mpAudioSource mpIODeviceSource->start(); mpAudioSource->start(mpIODeviceSource.get()); // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread started"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread started", "color:blue;font-weight:bold;", 4000); mpTimer->start(static_cast(aDurationUsecs / 1000), Qt::PreciseTimer, this); } /** * Slot to handle captured audio data. * *

Saves the audio data to a QIODevice. * * @param rAudioBuffer [in] audio data with (containing valid * QAudioFormat). */ void RtpWorker::handleAudioCaptured(const QAudioBuffer& rAudioBuffer) { // log the capture event every 50 packets static int counter = 0; if (++counter % 50 == 0) { Q_EMIT addLogEntry(spdlog::level::info, std::format( "captured {} bytes", rAudioBuffer.byteCount()).c_str()); } } /** * Audio updated slot handler. * * @param aTSIncrement [in] Timestamp increment per datagram. * @param rAudioData [in] raw multichannel audio. */ void RtpWorker::handleAudioUpdated( const qint64 aTSIncrement, const QByteArrayList& rAudioData) { mTSIncrement = aTSIncrement; // must have a previous set of buffers so we can pick up // from the last known good sequence number sent. // Not sure what happens if this slot is called // while the RtpWorker thead is stopped if (!mTXBufferList.isEmpty()) { // use last rtp header to continue sequence number from // the last transmitted sequence number const auto headerLen = gHeaderLenLambda(mTXBufferList.back()); const auto rawHeader = std::make_unique( mTXBufferList.back().sliced(0, headerLen)); const auto rtpHeader = reinterpret_cast(rawHeader->data()); auto seqNum = rtpHeader->getSeqNum() + 1; auto timeStamp = rtpHeader->getTimestamp() + mTSIncrement; mTXBufferList.clear(); for (const auto& next : rAudioData) { if (mTXBufferList.empty()) { rtpHeader->m = 1u; } else { rtpHeader->m = 0u; } rtpHeader->setSeqNum(seqNum); rtpHeader->setTimestamp(timeStamp); // each datagram consists of an rtp_hdr_t (12 bytes + // optional extensions) followed by // the raw audio data. QByteArrayList temp = { *rawHeader, next }; mTXBufferList.emplace_back(temp.join()); seqNum++; timeStamp += mTSIncrement; } } // if the sink & mpIODeviceSink device are started and open if (mpIODeviceSink && mpIODeviceSink->isOpen()) { const auto& rawAudio = rAudioData.join(); mpIODeviceSink->setData(rawAudio); } } /** * QUdpSocket slot readyRead handler. * *

Slot called when UDP data returned via echo to this * thread. When called drain any pending datagrams and pet the * inactivity watchdog timer. */ void RtpWorker::readyRead() { QByteArrayList datagrams; while (mpSocket->hasPendingDatagrams()) { // each datagram consists of an RTP header followed by audio data const auto& networkDatagram = mpSocket->receiveDatagram(); if (networkDatagram.isValid()) { datagrams.emplace_back(networkDatagram.data()); } } // send whatever we have so far. Note that the datagram sizes might // not all be the same size if the audio configuration was changed // while still running. This will be handled in the main thread if (!datagrams.isEmpty()) { Q_EMIT datagramsReady(datagrams); // handle watchdog if (!mpWatchdog->isActive()) { // start watchdog mpWatchdog->setSingleShot(true); mpWatchdog->start(2000); } else { // pet the watchdog mpWatchdog->start(2000); } } } /** * Stop slot handler. * *

This is called in the worker thread context. Close open * timers, sockets, QAudioSources and QAudioSinks. */ void RtpWorker::handleStop() { mpTimer->stop(); mpSocket->close(); mpWatchdog->stop(); mTXBufferList.clear(); mTSIncrement = 0; mTXCounter = 0; if (mpAudioSource) { mpAudioSource->stop(); } if (mpIODeviceSource) { mpIODeviceSource->stop(); } if (mpAudioSink) { mpAudioSink->stop(); } if (mpIODeviceSink) { mpIODeviceSink->stop(); } // update log tab Q_EMIT addLogEntry(spdlog::level::info, "RTP thread stopped"); // notify main thread that we successfully started Q_EMIT workerStatusChange("RTP thread stopped", "color:blue;font-weight:bold;", 4000); } /** * Cleanup method - runs in worker thread context. */ void RtpWorker::cleanup() { // delete timer, socket... etc. mpTimer.reset(); mpWatchdog.reset(); mpSocket.reset(); mpAudioSource.reset(); mpIODeviceSource.reset(); mpAudioSink.reset(); mpIODeviceSink.reset(); mTSIncrement = 0; mTXCounter = 0; // quit makes exec() return from the event loop mpThread->quit(); } /** * Timer callback event handler. * * @param event [in] timer event. */ void RtpWorker::timerEvent(QTimerEvent* event) { // send all mTXData datagrams if (event->timerId() == mpTimer->timerId()) { // blast out the entire block queued up datagrams at once for (auto& next : mTXBufferList) { mpSocket->writeDatagram( next, mTargetIP, mTargetPort); } mTXCounter += mTXBufferList.size(); // TODO change to use a sent packet stats counter const auto lastTxHeader = reinterpret_cast( mTXBufferList.back().data()); const auto lastSentSeq = lastTxHeader->getSeqNum(); const auto lastSentTS = lastTxHeader->getTimestamp(); // update all rtp headers in the mTXBufferList for (auto i = 0u; i < mTXBufferList.size(); ++i) { // update each header in the mTXBufferList // making sure to increment the timestamp, sequence number and // reset the marker bit as it has just been transmitted const auto rtpHeader = reinterpret_cast< rtp_hdr_t*>(mTXBufferList[i].data()); // increment the sequence numbers rtpHeader->setSeqNum(lastSentSeq + i + 1); // increment timestamp rtpHeader->setTimestamp(lastSentTS + (i + 1) * mTSIncrement); // reset the marker bit so it doesn't keep getting transmitted rtpHeader->m = 0u; } } else { // pass unhandled timer event up the chain QObject::timerEvent(event); } }