diff --git a/Telegram/SourceFiles/storage/file_upload.cpp b/Telegram/SourceFiles/storage/file_upload.cpp index b718ed95b..3028b3e6d 100644 --- a/Telegram/SourceFiles/storage/file_upload.cpp +++ b/Telegram/SourceFiles/storage/file_upload.cpp @@ -52,6 +52,13 @@ constexpr auto kUploadRequestInterval = crl::time(200); // How much time without upload causes additional session kill. constexpr auto kKillSessionTimeout = 15 * crl::time(1000); +// How much wait after session kill before killing another one. +constexpr auto kWaitForNormalizeTimeout = 8 * crl::time(1000); + +constexpr auto kMaxSessionsCount = 8; +constexpr auto kFastRequestThreshold = 1 * crl::time(1000); +constexpr auto kSlowRequestThreshold = 8 * crl::time(1000); + [[nodiscard]] const char *ThumbnailFormat(const QString &mime) { return Core::IsMimeSticker(mime) ? "WEBP" : "JPG"; } @@ -70,9 +77,9 @@ struct Uploader::Entry { not_null*> parts; uint64 partsOfId = 0; - int partsSent = 0; - int partsWaiting = 0; - int64 partsSentSize = 0; + int64 sentSize = 0; + ushort partsSent = 0; + ushort partsWaiting = 0; HashMd5 md5Hash; @@ -80,9 +87,9 @@ struct Uploader::Entry { int64 docSize = 0; int64 docSentSize = 0; int docPartSize = 0; - int docPartsSent = 0; - int docPartsCount = 0; - int docPartsWaiting = 0; + ushort docPartsSent = 0; + ushort docPartsCount = 0; + ushort docPartsWaiting = 0; }; @@ -90,8 +97,10 @@ struct Uploader::Request { FullMsgId itemId; crl::time sent = 0; QByteArray bytes; - int dcIndex = 0; + ushort part = 0; + uchar dcIndex = 0; bool docPart = false; + bool bigPart = false; bool nonPremiumDelayed = false; }; @@ -138,7 +147,7 @@ bool Uploader::Entry::setPartSize(int partSize) { Uploader::Uploader(not_null api) : _api(api) -, _nextTimer([=] { maybeSendNext(); }) +, _nextTimer([=] { maybeSend(); }) , _stopSessionsTimer([=] { stopSessions(); }) { const auto session = &_api->session(); photoReady( @@ -317,7 +326,9 @@ void Uploader::upload( } } _queue.push_back({ itemId, file }); - maybeSendNext(); + if (!_nextTimer.isActive()) { + maybeSend(); + } } void Uploader::failed(FullMsgId itemId) { @@ -330,7 +341,7 @@ void Uploader::failed(FullMsgId itemId) { cancelRequests(itemId); maybeFinishFront(); crl::on_main(this, [=] { - maybeSendNext(); + maybeSend(); }); } @@ -361,6 +372,7 @@ void Uploader::stopSessions() { _api->instance().stopSession(MTP::uploadDcId(i)); } _sentPerDcIndex.clear(); + _dcIndicesWithFastRequests.clear(); } } @@ -394,7 +406,181 @@ QByteArray Uploader::readDocPart(not_null entry) { return checked(entry->docFile->read(entry->docPartSize)); } -void Uploader::maybeSendNext() { +bool Uploader::canAddDcIndex() const { + const auto count = int(_sentPerDcIndex.size()); + return (count < kMaxSessionsCount) + && (count == int(_dcIndicesWithFastRequests.size())); +} + +std::optional Uploader::chooseDcIndexForNextRequest( + const base::flat_set &used) { + for (auto i = 0, count = int(_sentPerDcIndex.size()); i != count; ++i) { + if (!_sentPerDcIndex[i] && !used.contains(i)) { + return i; + } + } + if (canAddDcIndex()) { + const auto result = int(_sentPerDcIndex.size()); + _sentPerDcIndex.push_back(0); + _dcIndicesWithFastRequests.clear(); + _latestDcIndexAdded = crl::now(); + + DEBUG_LOG(("Uploader: Added dc index %1.").arg(result)); + return result; + } + auto result = std::optional(); + for (auto i = 0, count = int(_sentPerDcIndex.size()); i != count; ++i) { + if (!used.contains(i) + && (!result.has_value() + || _sentPerDcIndex[i] < _sentPerDcIndex[*result])) { + result = i; + } + } + return result; +} + +Uploader::Entry *Uploader::chooseEntryForNextRequest() { + if (!_pendingFromRemovedDcIndices.empty()) { + const auto itemId = _pendingFromRemovedDcIndices.front().itemId; + const auto i = ranges::find(_queue, itemId, &Entry::itemId); + Assert(i != end(_queue)); + return &*i; + } + + for (auto i = begin(_queue); i != end(_queue); ++i) { + if (i->partsSent < i->parts->size() + || i->docPartsSent < i->docPartsCount) { + return &*i; + } + } + return nullptr; +} + +auto Uploader::sendPart(not_null entry, uchar dcIndex) +-> SendResult { + return !_pendingFromRemovedDcIndices.empty() + ? sendPendingPart(entry, dcIndex) + : (entry->partsSent < entry->parts->size()) + ? sendSlicedPart(entry, dcIndex) + : sendDocPart(entry, dcIndex); +} + +template +void Uploader::sendPreparedRequest(Prepared &&prepared, Request &&request) { + _sentPerDcIndex[request.dcIndex] += int(request.bytes.size()); + + const auto requestId = _api->request( + std::move(prepared) + ).done([=](const MTPBool &result, mtpRequestId requestId) { + partLoaded(result, requestId); + }).fail([=](const MTP::Error &error, mtpRequestId requestId) { + partFailed(error, requestId); + }).toDC(MTP::uploadDcId(request.dcIndex)).send(); + + request.sent = crl::now(); + _requests.emplace(requestId, std::move(request)); +} + +auto Uploader::sendPendingPart(not_null entry, uchar dcIndex) +-> SendResult { + Expects(!_pendingFromRemovedDcIndices.empty()); + Expects(_pendingFromRemovedDcIndices.front().itemId == entry->itemId); + + auto request = std::move(_pendingFromRemovedDcIndices.front()); + _pendingFromRemovedDcIndices.erase(begin(_pendingFromRemovedDcIndices)); + + const auto part = request.part; + const auto bytes = request.bytes; + request.dcIndex = dcIndex; + if (request.bigPart) { + sendPreparedRequest(MTPupload_SaveBigFilePart( + MTP_long(entry->file->id), + MTP_int(part), + MTP_int(entry->docPartsCount), + MTP_bytes(bytes) + ), std::move(request)); + } else { + const auto id = request.docPart ? entry->file->id : entry->partsOfId; + sendPreparedRequest(MTPupload_SaveFilePart( + MTP_long(id), + MTP_int(part), + MTP_bytes(bytes) + ), std::move(request)); + } + return SendResult::Success; +} + +auto Uploader::sendDocPart(not_null entry, uchar dcIndex) +-> SendResult { + const auto itemId = entry->itemId; + const auto alreadySent = _sentPerDcIndex[dcIndex]; + const auto willProbablyBeSent = entry->docPartSize; + if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) { + return SendResult::DcIndexFull; + } + + Assert(entry->docPartsSent < entry->docPartsCount); + + const auto partBytes = readDocPart(entry); + if (partBytes.isEmpty()) { + failed(itemId); + return SendResult::Failed; + } + const auto part = entry->docPartsSent++; + ++entry->docPartsWaiting; + + const auto send = [&](auto &&request, bool big) { + sendPreparedRequest(std::move(request), { + .itemId = itemId, + .bytes = partBytes, + .part = part, + .dcIndex = dcIndex, + .docPart = true, + .bigPart = big, + }); + }; + if (entry->docSize > kUseBigFilesFrom) { + send(MTPupload_SaveBigFilePart( + MTP_long(entry->file->id), + MTP_int(part), + MTP_int(entry->docPartsCount), + MTP_bytes(partBytes) + ), true); + } else { + send(MTPupload_SaveFilePart( + MTP_long(entry->file->id), + MTP_int(part), + MTP_bytes(partBytes) + ), false); + } + return SendResult::Success; +} + +auto Uploader::sendSlicedPart(not_null entry, uchar dcIndex) +-> SendResult { + const auto itemId = entry->itemId; + const auto alreadySent = _sentPerDcIndex[dcIndex]; + const auto willBeSent = entry->parts->at(entry->partsSent).size(); + if (alreadySent + willBeSent >= kMaxUploadPerSession) { + return SendResult::DcIndexFull; + } + + ++entry->partsWaiting; + const auto index = entry->partsSent++; + const auto partBytes = entry->parts->at(index); + sendPreparedRequest(MTPupload_SaveFilePart( + MTP_long(entry->partsOfId), + MTP_int(index), + MTP_bytes(partBytes) + ), { + .itemId = itemId, + .bytes = partBytes, + .dcIndex = dcIndex, + }); + return SendResult::Success; +} + +void Uploader::maybeSend() { const auto stopping = _stopSessionsTimer.isActive(); if (_queue.empty()) { if (!stopping) { @@ -404,111 +590,35 @@ void Uploader::maybeSendNext() { return; } else if (_pausedId) { return; - } - - if (stopping) { + } else if (stopping) { _stopSessionsTimer.cancel(); } - auto todc = 0; - if (_sentPerDcIndex.size() < 2) { - todc = int(_sentPerDcIndex.size()); - _sentPerDcIndex.resize(todc + 1); - } else { - const auto min = ranges::min_element(_sentPerDcIndex); - todc = int(min - begin(_sentPerDcIndex)); - } - const auto alreadySent = _sentPerDcIndex[todc]; - - const auto entry = [&]() -> Entry* { - for (auto i = begin(_queue); i != end(_queue); ++i) { - if (i->partsSent < i->parts->size() - || i->docPartsSent < i->docPartsCount) { - return &*i; + auto usedDcIndices = base::flat_set(); + while (true) { + const auto maybeDcIndex = chooseDcIndexForNextRequest(usedDcIndices); + if (!maybeDcIndex.has_value()) { + break; + } + const auto dcIndex = *maybeDcIndex; + while (true) { + const auto entry = chooseEntryForNextRequest(); + if (!entry) { + return; } + const auto result = sendPart(entry, dcIndex); + if (result == SendResult::DcIndexFull) { + return; + } else if (result == SendResult::Success) { + break; + } + // If this entry failed, we try the next one. } - return nullptr; - }(); - if (!entry) { - return; + usedDcIndices.emplace(dcIndex); } - - const auto itemId = entry->itemId; - if (entry->partsSent >= entry->parts->size()) { - const auto willProbablyBeSent = entry->docPartSize; - if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) { - return; - } - - Assert(entry->docPartsSent < entry->docPartsCount); - - const auto partBytes = readDocPart(entry); - if (partBytes.isEmpty()) { - failed(itemId); - return; - } - const auto index = entry->docPartsSent++; - ++entry->docPartsWaiting; - - mtpRequestId requestId; - if (entry->docSize > kUseBigFilesFrom) { - requestId = _api->request(MTPupload_SaveBigFilePart( - MTP_long(entry->file->id), - MTP_int(index), - MTP_int(entry->docPartsCount), - MTP_bytes(partBytes) - )).done([=](const MTPBool &result, mtpRequestId requestId) { - partLoaded(result, requestId); - }).fail([=](const MTP::Error &error, mtpRequestId requestId) { - partFailed(error, requestId); - }).toDC(MTP::uploadDcId(todc)).send(); - } else { - requestId = _api->request(MTPupload_SaveFilePart( - MTP_long(entry->file->id), - MTP_int(index), - MTP_bytes(partBytes) - )).done([=](const MTPBool &result, mtpRequestId requestId) { - partLoaded(result, requestId); - }).fail([=](const MTP::Error &error, mtpRequestId requestId) { - partFailed(error, requestId); - }).toDC(MTP::uploadDcId(todc)).send(); - } - _requests.emplace(requestId, Request{ - .itemId = itemId, - .sent = crl::now(), - .bytes = partBytes, - .dcIndex = todc, - .docPart = true, - }); - _sentPerDcIndex[todc] += int(partBytes.size()); - } else { - const auto willBeSent = entry->parts->at(entry->partsSent).size(); - if (alreadySent + willBeSent >= kMaxUploadPerSession) { - return; - } - - ++entry->partsWaiting; - const auto index = entry->partsSent++; - const auto partBytes = entry->parts->at(index); - const auto requestId = _api->request(MTPupload_SaveFilePart( - MTP_long(entry->partsOfId), - MTP_int(index), - MTP_bytes(partBytes) - )).done([=](const MTPBool &result, mtpRequestId requestId) { - partLoaded(result, requestId); - }).fail([=](const MTP::Error &error, mtpRequestId requestId) { - partFailed(error, requestId); - }).toDC(MTP::uploadDcId(todc)).send(); - - _requests.emplace(requestId, Request{ - .itemId = itemId, - .sent = crl::now(), - .bytes = partBytes, - .dcIndex = todc, - }); - _sentPerDcIndex[todc] += int(partBytes.size()); + if (!usedDcIndices.empty()) { + _nextTimer.callOnce(kUploadRequestInterval); } - _nextTimer.callOnce(kUploadRequestInterval); } void Uploader::cancel(FullMsgId itemId) { @@ -529,7 +639,7 @@ void Uploader::pause(FullMsgId itemId) { void Uploader::unpause() { _pausedId = FullMsgId(); - maybeSendNext(); + maybeSend(); } void Uploader::cancelRequests(FullMsgId itemId) { @@ -543,6 +653,11 @@ void Uploader::cancelRequests(FullMsgId itemId) { ++i; } } + _pendingFromRemovedDcIndices.erase(ranges::remove( + _pendingFromRemovedDcIndices, + itemId, + &Request::itemId + ), end(_pendingFromRemovedDcIndices)); } void Uploader::cancelAllRequests() { @@ -582,19 +697,48 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { Assert(i != end(_queue)); auto &entry = *i; + const auto now = crl::now(); + const auto duration = now - request.sent; + const auto fast = (duration < kFastRequestThreshold); + const auto slowish = !fast; + const auto slow = (duration >= kSlowRequestThreshold); + + if (slowish) { + _dcIndicesWithFastRequests.clear(); + if (slow) { + const auto elapsed = (now - _latestDcIndexRemoved); + const auto remove = (elapsed >= kWaitForNormalizeTimeout); + if (remove && _sentPerDcIndex.size() > 1) { + DEBUG_LOG(("Uploader: Slow request, removing dc index.")); + removeDcIndex(); + _latestDcIndexRemoved = now; + } else { + DEBUG_LOG(("Uploader: Slow request, clear fast records.")); + } + } else { + DEBUG_LOG(("Uploader: Slow-ish request, clear fast records.")); + } + } else if (request.sent > _latestDcIndexAdded) { + if (_dcIndicesWithFastRequests.emplace(request.dcIndex).second) { + DEBUG_LOG(("Uploader: Mark %1 of %2 as fast." + ).arg(request.dcIndex + ).arg(_sentPerDcIndex.size())); + } + } + if (request.docPart) { --entry.docPartsWaiting; entry.docSentSize += bytes; } else { --entry.partsWaiting; - entry.partsSentSize += bytes; + entry.sentSize += bytes; } if (entry.file->type == SendMediaType::Photo) { const auto photo = session().data().photo(entry.file->id); if (photo->uploading()) { photo->uploadingData->size = entry.file->partssize; - photo->uploadingData->offset = entry.partsSentSize; + photo->uploadingData->offset = entry.sentSize; } _photoProgress.fire_copy(itemId); } else if (entry.file->type == SendMediaType::File @@ -610,7 +754,7 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { } else if (entry.file->type == SendMediaType::Secure) { _secureProgress.fire_copy({ .fullId = itemId, - .offset = entry.partsSentSize, + .offset = entry.sentSize, .size = entry.file->partssize, }); } @@ -621,7 +765,29 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { if (!_queue.empty() && itemId == _queue.front().itemId) { maybeFinishFront(); } - maybeSendNext(); + maybeSend(); +} + +void Uploader::removeDcIndex() { + Expects(_sentPerDcIndex.size() > 1); + + const auto dcIndex = int(_sentPerDcIndex.size()) - 1; + for (auto i = begin(_requests); i != end(_requests);) { + if (i->second.dcIndex == dcIndex) { + const auto bytes = int(i->second.bytes.size()); + _sentPerDcIndex[dcIndex] -= bytes; + _api->request(i->first).cancel(); + _pendingFromRemovedDcIndices.push_back(std::move(i->second)); + i = _requests.erase(i); + } else { + ++i; + } + } + Assert(_sentPerDcIndex.back() == 0); + _sentPerDcIndex.pop_back(); + _dcIndicesWithFastRequests.remove(dcIndex); + _api->instance().stopSession(MTP::uploadDcId(dcIndex)); + DEBUG_LOG(("Uploader: Removed dc index %1.").arg(dcIndex)); } void Uploader::maybeFinishFront() { diff --git a/Telegram/SourceFiles/storage/file_upload.h b/Telegram/SourceFiles/storage/file_upload.h index f4fced340..6f87f6a56 100644 --- a/Telegram/SourceFiles/storage/file_upload.h +++ b/Telegram/SourceFiles/storage/file_upload.h @@ -103,8 +103,30 @@ private: struct Entry; struct Request; + enum class SendResult : uchar { + Success, + Failed, + DcIndexFull, + }; + + void maybeSend(); + [[nodiscard]] bool canAddDcIndex() const; + [[nodiscard]] std::optional chooseDcIndexForNextRequest( + const base::flat_set &used); + [[nodiscard]] Entry *chooseEntryForNextRequest(); + [[nodiscard]] SendResult sendPart(not_null entry, uchar dcIndex); + [[nodiscard]] auto sendPendingPart(not_null entry, uchar dcIndex) + -> SendResult; + [[nodiscard]] auto sendDocPart(not_null entry, uchar dcIndex) + -> SendResult; + [[nodiscard]] auto sendSlicedPart(not_null entry, uchar dcIndex) + -> SendResult; [[nodiscard]] QByteArray readDocPart(not_null entry); - void maybeSendNext(); + void removeDcIndex(); + + template + void sendPreparedRequest(Prepared &&prepared, Request &&request); + void maybeFinishFront(); void finishFront(); @@ -135,6 +157,12 @@ private: base::flat_map _requests; std::vector _sentPerDcIndex; + // Fast requests since the latest dc index addition. + base::flat_set _dcIndicesWithFastRequests; + crl::time _latestDcIndexAdded = 0; + crl::time _latestDcIndexRemoved = 0; + std::vector _pendingFromRemovedDcIndices; + FullMsgId _pausedId; base::Timer _nextTimer, _stopSessionsTimer;