diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index 06d73cfa7f..799be9e94f 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -54,9 +54,14 @@ int File::Context::read(bytes::span buffer) { } buffer = buffer.subspan(0, amount); - while (!_reader->fill(_offset, buffer, &_semaphore)) { - processQueuedPackets(SleepPolicy::Disallowed); - _delegate->fileWaitingForData(); + while (true) { + const auto result = _reader->fill(_offset, buffer, &_semaphore); + if (result == Reader::FillState::Success) { + break; + } else if (result == Reader::FillState::WaitingRemote) { + processQueuedPackets(SleepPolicy::Allowed); + _delegate->fileWaitingForData(); + } _semaphore.acquire(); if (_interrupted) { return -1; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp index 5273bafdae..9a58d368d1 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp @@ -19,7 +19,7 @@ namespace Streaming { namespace { constexpr auto kBufferFor = 3 * crl::time(1000); -constexpr auto kLoadInAdvanceForRemote = 64 * crl::time(1000); +constexpr auto kLoadInAdvanceForRemote = 32 * crl::time(1000); constexpr auto kLoadInAdvanceForLocal = 5 * crl::time(1000); constexpr auto kMsFrequency = 1000; // 1000 ms per second. diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index d204cf9100..04c4c5600d 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -584,10 +584,12 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { } }; const auto handleReadFromCache = [&](int sliceIndex) { - if (cacheNotLoaded(sliceIndex) - && !(_data[sliceIndex].flags & Flag::LoadingFromCache)) { - _data[sliceIndex].flags |= Flag::LoadingFromCache; - result.sliceNumbersFromCache.add(sliceIndex + 1); + if (cacheNotLoaded(sliceIndex)) { + if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) { + _data[sliceIndex].flags |= Flag::LoadingFromCache; + result.sliceNumbersFromCache.add(sliceIndex + 1); + } + result.state = FillState::WaitingCache; } }; const auto firstFrom = offset - fromSlice * kInSlice; @@ -618,7 +620,7 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { secondTill); } result.toCache = serializeAndUnloadUnused(); - result.filled = true; + result.state = FillState::Success; } else { handleReadFromCache(fromSlice); if (fromSlice + 1 < tillSlice) { @@ -646,7 +648,7 @@ auto Reader::Slices::fillFromHeader(int offset, bytes::span buffer) ranges::make_subrange(prepared.start, prepared.finish), from, till); - result.filled = true; + result.state = FillState::Success; } return result; } @@ -1209,7 +1211,7 @@ bool Reader::fullInCache() const { return _slices.fullInCache(); } -bool Reader::fill( +Reader::FillState Reader::fill( int offset, bytes::span buffer, not_null notify) { @@ -1229,37 +1231,38 @@ bool Reader::fill( }; const auto done = [&] { clearWaiting(); - return true; + return FillState::Success; }; const auto failed = [&] { clearWaiting(); notify->release(); - return false; + return FillState::Failed; }; checkForSomethingMoreReceived(); if (_streamingError) { - return failed(); + return FillState::Failed; } + auto lastResult = FillState(); do { - if (fillFromSlices(offset, buffer)) { - clearWaiting(); - return true; + lastResult = fillFromSlices(offset, buffer); + if (lastResult == FillState::Success) { + return done(); } startWaiting(); } while (checkForSomethingMoreReceived()); - return _streamingError ? failed() : false; + return _streamingError ? failed() : lastResult; } -bool Reader::fillFromSlices(int offset, bytes::span buffer) { +Reader::FillState Reader::fillFromSlices(int offset, bytes::span buffer) { using namespace rpl::mappers; auto result = _slices.fill(offset, buffer); - if (!result.filled && _slices.headerWontBeFilled()) { + if (result.state != FillState::Success && _slices.headerWontBeFilled()) { _streamingError = Error::NotStreamable; - return false; + return FillState::Failed; } for (const auto sliceNumber : result.sliceNumbersFromCache.values()) { @@ -1283,7 +1286,7 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) { } loadAtOffset(offset); } - return result.filled; + return result.state; } void Reader::cancelLoadInRange(int from, int till) { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index b8717a8254..a1ef902d44 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -32,6 +32,13 @@ enum class Error; class Reader final : public base::has_weak_ptr { public: + enum class FillState : uchar { + Success, + WaitingCache, + WaitingRemote, + Failed, + }; + // Main thread. explicit Reader( std::unique_ptr loader, @@ -44,7 +51,7 @@ public: [[nodiscard]] bool isRemoteLoader() const; // Single thread. - [[nodiscard]] bool fill( + [[nodiscard]] FillState fill( int offset, bytes::span buffer, not_null notify); @@ -101,9 +108,8 @@ private: StackIntVector sliceNumbersFromCache; StackIntVector offsetsFromLoader; SerializedSlice toCache; - bool filled = false; + FillState state = FillState::WaitingRemote; }; - struct Slice { enum class Flag : uchar { LoadingFromCache = 0x01, @@ -209,7 +215,7 @@ private: bool checkForSomethingMoreReceived(); - bool fillFromSlices(int offset, bytes::span buffer); + FillState fillFromSlices(int offset, bytes::span buffer); void finalizeCache();