Fix a case of huge memory consumption in streaming.

This commit is contained in:
John Preston 2020-06-29 12:42:56 +04:00
parent 1bd0b03e8e
commit 57249c6ea0
4 changed files with 40 additions and 26 deletions

View file

@ -54,9 +54,14 @@ int File::Context::read(bytes::span buffer) {
} }
buffer = buffer.subspan(0, amount); buffer = buffer.subspan(0, amount);
while (!_reader->fill(_offset, buffer, &_semaphore)) { while (true) {
processQueuedPackets(SleepPolicy::Disallowed); const auto result = _reader->fill(_offset, buffer, &_semaphore);
if (result == Reader::FillState::Success) {
break;
} else if (result == Reader::FillState::WaitingRemote) {
processQueuedPackets(SleepPolicy::Allowed);
_delegate->fileWaitingForData(); _delegate->fileWaitingForData();
}
_semaphore.acquire(); _semaphore.acquire();
if (_interrupted) { if (_interrupted) {
return -1; return -1;

View file

@ -19,7 +19,7 @@ namespace Streaming {
namespace { namespace {
constexpr auto kBufferFor = 3 * crl::time(1000); constexpr auto kBufferFor = 3 * crl::time(1000);
constexpr auto kLoadInAdvanceForRemote = 64 * crl::time(1000); constexpr auto kLoadInAdvanceForRemote = 32 * crl::time(1000);
constexpr auto kLoadInAdvanceForLocal = 5 * crl::time(1000); constexpr auto kLoadInAdvanceForLocal = 5 * crl::time(1000);
constexpr auto kMsFrequency = 1000; // 1000 ms per second. constexpr auto kMsFrequency = 1000; // 1000 ms per second.

View file

@ -584,11 +584,13 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
} }
}; };
const auto handleReadFromCache = [&](int sliceIndex) { const auto handleReadFromCache = [&](int sliceIndex) {
if (cacheNotLoaded(sliceIndex) if (cacheNotLoaded(sliceIndex)) {
&& !(_data[sliceIndex].flags & Flag::LoadingFromCache)) { if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) {
_data[sliceIndex].flags |= Flag::LoadingFromCache; _data[sliceIndex].flags |= Flag::LoadingFromCache;
result.sliceNumbersFromCache.add(sliceIndex + 1); result.sliceNumbersFromCache.add(sliceIndex + 1);
} }
result.state = FillState::WaitingCache;
}
}; };
const auto firstFrom = offset - fromSlice * kInSlice; const auto firstFrom = offset - fromSlice * kInSlice;
const auto firstTill = std::min(kInSlice, till - fromSlice * kInSlice); const auto firstTill = std::min(kInSlice, till - fromSlice * kInSlice);
@ -618,7 +620,7 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
secondTill); secondTill);
} }
result.toCache = serializeAndUnloadUnused(); result.toCache = serializeAndUnloadUnused();
result.filled = true; result.state = FillState::Success;
} else { } else {
handleReadFromCache(fromSlice); handleReadFromCache(fromSlice);
if (fromSlice + 1 < tillSlice) { if (fromSlice + 1 < tillSlice) {
@ -646,7 +648,7 @@ auto Reader::Slices::fillFromHeader(int offset, bytes::span buffer)
ranges::make_subrange(prepared.start, prepared.finish), ranges::make_subrange(prepared.start, prepared.finish),
from, from,
till); till);
result.filled = true; result.state = FillState::Success;
} }
return result; return result;
} }
@ -1209,7 +1211,7 @@ bool Reader::fullInCache() const {
return _slices.fullInCache(); return _slices.fullInCache();
} }
bool Reader::fill( Reader::FillState Reader::fill(
int offset, int offset,
bytes::span buffer, bytes::span buffer,
not_null<crl::semaphore*> notify) { not_null<crl::semaphore*> notify) {
@ -1229,37 +1231,38 @@ bool Reader::fill(
}; };
const auto done = [&] { const auto done = [&] {
clearWaiting(); clearWaiting();
return true; return FillState::Success;
}; };
const auto failed = [&] { const auto failed = [&] {
clearWaiting(); clearWaiting();
notify->release(); notify->release();
return false; return FillState::Failed;
}; };
checkForSomethingMoreReceived(); checkForSomethingMoreReceived();
if (_streamingError) { if (_streamingError) {
return failed(); return FillState::Failed;
} }
auto lastResult = FillState();
do { do {
if (fillFromSlices(offset, buffer)) { lastResult = fillFromSlices(offset, buffer);
clearWaiting(); if (lastResult == FillState::Success) {
return true; return done();
} }
startWaiting(); startWaiting();
} while (checkForSomethingMoreReceived()); } while (checkForSomethingMoreReceived());
return _streamingError ? failed() : false; return _streamingError ? failed() : lastResult;
} }
bool Reader::fillFromSlices(int offset, bytes::span buffer) { Reader::FillState Reader::fillFromSlices(int offset, bytes::span buffer) {
using namespace rpl::mappers; using namespace rpl::mappers;
auto result = _slices.fill(offset, buffer); auto result = _slices.fill(offset, buffer);
if (!result.filled && _slices.headerWontBeFilled()) { if (result.state != FillState::Success && _slices.headerWontBeFilled()) {
_streamingError = Error::NotStreamable; _streamingError = Error::NotStreamable;
return false; return FillState::Failed;
} }
for (const auto sliceNumber : result.sliceNumbersFromCache.values()) { for (const auto sliceNumber : result.sliceNumbersFromCache.values()) {
@ -1283,7 +1286,7 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) {
} }
loadAtOffset(offset); loadAtOffset(offset);
} }
return result.filled; return result.state;
} }
void Reader::cancelLoadInRange(int from, int till) { void Reader::cancelLoadInRange(int from, int till) {

View file

@ -32,6 +32,13 @@ enum class Error;
class Reader final : public base::has_weak_ptr { class Reader final : public base::has_weak_ptr {
public: public:
enum class FillState : uchar {
Success,
WaitingCache,
WaitingRemote,
Failed,
};
// Main thread. // Main thread.
explicit Reader( explicit Reader(
std::unique_ptr<Loader> loader, std::unique_ptr<Loader> loader,
@ -44,7 +51,7 @@ public:
[[nodiscard]] bool isRemoteLoader() const; [[nodiscard]] bool isRemoteLoader() const;
// Single thread. // Single thread.
[[nodiscard]] bool fill( [[nodiscard]] FillState fill(
int offset, int offset,
bytes::span buffer, bytes::span buffer,
not_null<crl::semaphore*> notify); not_null<crl::semaphore*> notify);
@ -101,9 +108,8 @@ private:
StackIntVector<kReadFromCacheMax> sliceNumbersFromCache; StackIntVector<kReadFromCacheMax> sliceNumbersFromCache;
StackIntVector<kLoadFromRemoteMax> offsetsFromLoader; StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
SerializedSlice toCache; SerializedSlice toCache;
bool filled = false; FillState state = FillState::WaitingRemote;
}; };
struct Slice { struct Slice {
enum class Flag : uchar { enum class Flag : uchar {
LoadingFromCache = 0x01, LoadingFromCache = 0x01,
@ -209,7 +215,7 @@ private:
bool checkForSomethingMoreReceived(); bool checkForSomethingMoreReceived();
bool fillFromSlices(int offset, bytes::span buffer); FillState fillFromSlices(int offset, bytes::span buffer);
void finalizeCache(); void finalizeCache();