Allow upload requests for several files at once.

This commit is contained in:
John Preston 2024-04-03 13:49:36 +04:00
parent 5b0cac47ad
commit 0dcc439dda
4 changed files with 402 additions and 358 deletions

View file

@ -40,8 +40,6 @@ constexpr ShiftedDcId groupCallStreamDcId(DcId dcId) {
return ShiftDcId(dcId, kGroupCallStreamDcShift); return ShiftDcId(dcId, kGroupCallStreamDcShift);
} }
constexpr auto kUploadSessionsCount = 2;
namespace details { namespace details {
constexpr ShiftedDcId downloadDcId(DcId dcId, int index) { constexpr ShiftedDcId downloadDcId(DcId dcId, int index) {
@ -92,7 +90,6 @@ inline DcId getTemporaryIdFromRealDcId(ShiftedDcId shiftedDcId) {
namespace details { namespace details {
constexpr ShiftedDcId uploadDcId(DcId dcId, int index) { constexpr ShiftedDcId uploadDcId(DcId dcId, int index) {
static_assert(kUploadSessionsCount < kMaxMediaDcCount, "Too large MTPUploadSessionsCount!");
return ShiftDcId(dcId, kBaseUploadDcShift + index); return ShiftDcId(dcId, kBaseUploadDcShift + index);
}; };
@ -101,14 +98,14 @@ constexpr ShiftedDcId uploadDcId(DcId dcId, int index) {
// send(req, callbacks, MTP::uploadDcId(index)) - for upload shifted dc id // send(req, callbacks, MTP::uploadDcId(index)) - for upload shifted dc id
// uploading always to the main dc so BareDcId(result) == 0 // uploading always to the main dc so BareDcId(result) == 0
inline ShiftedDcId uploadDcId(int index) { inline ShiftedDcId uploadDcId(int index) {
Expects(index >= 0 && index < kUploadSessionsCount); Expects(index >= 0 && index < kMaxMediaDcCount);
return details::uploadDcId(0, index); return details::uploadDcId(0, index);
}; };
constexpr bool isUploadDcId(ShiftedDcId shiftedDcId) { constexpr bool isUploadDcId(ShiftedDcId shiftedDcId) {
return (shiftedDcId >= details::uploadDcId(0, 0)) return (shiftedDcId >= details::uploadDcId(0, 0))
&& (shiftedDcId < details::uploadDcId(0, kUploadSessionsCount - 1) + kDcShift); && (shiftedDcId < details::uploadDcId(0, kMaxMediaDcCount - 1) + kDcShift);
} }
inline ShiftedDcId destroyKeyNextDcId(ShiftedDcId shiftedDcId) { inline ShiftedDcId destroyKeyNextDcId(ShiftedDcId shiftedDcId) {

View file

@ -1108,9 +1108,6 @@ void SessionPrivate::onSentSome(uint64 size) {
DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain)); DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain));
} }
} }
if (isUploadDcId(_shiftedDcId)) {
remain *= kUploadSessionsCount;
}
_waitForReceivedTimer.callOnce(remain); _waitForReceivedTimer.callOnce(remain);
} }
if (!_firstSentAt) { if (!_firstSentAt) {

View file

@ -27,7 +27,7 @@ namespace Storage {
namespace { namespace {
// max 512kb uploaded at the same time in each session // max 512kb uploaded at the same time in each session
constexpr auto kMaxUploadFileParallelSize = MTP::kUploadSessionsCount * 512 * 1024; constexpr auto kMaxUploadPerSession = 512 * 1024;
constexpr auto kDocumentMaxPartsCountDefault = 4000; constexpr auto kDocumentMaxPartsCountDefault = 4000;
@ -47,7 +47,7 @@ constexpr auto kDocumentUploadPartSize3 = 256 * 1024;
constexpr auto kDocumentUploadPartSize4 = 512 * 1024; constexpr auto kDocumentUploadPartSize4 = 512 * 1024;
// One part each half second, if not uploaded faster. // One part each half second, if not uploaded faster.
constexpr auto kUploadRequestInterval = crl::time(500); constexpr auto kUploadRequestInterval = crl::time(200);
// How much time without upload causes additional session kill. // How much time without upload causes additional session kill.
constexpr auto kKillSessionTimeout = 15 * crl::time(1000); constexpr auto kKillSessionTimeout = 15 * crl::time(1000);
@ -58,35 +58,52 @@ constexpr auto kKillSessionTimeout = 15 * crl::time(1000);
} // namespace } // namespace
struct Uploader::File { struct Uploader::Entry {
explicit File(const std::shared_ptr<FilePrepareResult> &file); Entry(FullMsgId itemId, const std::shared_ptr<FilePrepareResult> &file);
void setDocSize(int64 size); void setDocSize(int64 size);
bool setPartSize(uint32 partSize); bool setPartSize(int partSize);
// const, but non-const for the move-assignment in the
FullMsgId itemId;
std::shared_ptr<FilePrepareResult> file; std::shared_ptr<FilePrepareResult> file;
const std::vector<QByteArray> &parts; not_null<std::vector<QByteArray>*> parts;
const uint64 partsOfId = 0; uint64 partsOfId = 0;
int partsSent = 0;
mutable int64 fileSentSize = 0; int partsSent = 0;
int partsWaiting = 0;
int64 partsSentSize = 0;
HashMd5 md5Hash; HashMd5 md5Hash;
std::unique_ptr<QFile> docFile; std::unique_ptr<QFile> docFile;
int64 docSize = 0; int64 docSize = 0;
int64 docPartSize = 0; int64 docSentSize = 0;
int docSentParts = 0; int docPartSize = 0;
int docPartsSent = 0;
int docPartsCount = 0; int docPartsCount = 0;
int docPartsWaiting = 0;
}; };
Uploader::File::File(const std::shared_ptr<FilePrepareResult> &file) struct Uploader::Request {
: file(file) FullMsgId itemId;
crl::time sent = 0;
QByteArray bytes;
int dcIndex = 0;
bool docPart = false;
bool nonPremiumDelayed = false;
};
Uploader::Entry::Entry(
FullMsgId itemId,
const std::shared_ptr<FilePrepareResult> &file)
: itemId(itemId)
, file(file)
, parts((file->type == SendMediaType::Photo , parts((file->type == SendMediaType::Photo
|| file->type == SendMediaType::Secure) || file->type == SendMediaType::Secure)
? file->fileparts ? &file->fileparts
: file->thumbparts) : &file->thumbparts)
, partsOfId((file->type == SendMediaType::Photo , partsOfId((file->type == SendMediaType::Photo
|| file->type == SendMediaType::Secure) || file->type == SendMediaType::Secure)
? file->id ? file->id
@ -95,12 +112,10 @@ Uploader::File::File(const std::shared_ptr<FilePrepareResult> &file)
|| file->type == SendMediaType::ThemeFile || file->type == SendMediaType::ThemeFile
|| file->type == SendMediaType::Audio) { || file->type == SendMediaType::Audio) {
setDocSize(file->filesize); setDocSize(file->filesize);
} else {
docSize = docPartSize = docPartsCount = 0;
} }
} }
void Uploader::File::setDocSize(int64 size) { void Uploader::Entry::setDocSize(int64 size) {
docSize = size; docSize = size;
constexpr auto limit0 = 1024 * 1024; constexpr auto limit0 = 1024 * 1024;
constexpr auto limit1 = 32 * limit0; constexpr auto limit1 = 32 * limit0;
@ -115,16 +130,15 @@ void Uploader::File::setDocSize(int64 size) {
} }
} }
bool Uploader::File::setPartSize(uint32 partSize) { bool Uploader::Entry::setPartSize(int partSize) {
docPartSize = partSize; docPartSize = partSize;
docPartsCount = (docSize / docPartSize) docPartsCount = (docSize + docPartSize - 1) / docPartSize;
+ ((docSize % docPartSize) ? 1 : 0);
return (docPartsCount <= kDocumentMaxPartsCountDefault); return (docPartsCount <= kDocumentMaxPartsCountDefault);
} }
Uploader::Uploader(not_null<ApiWrap*> api) Uploader::Uploader(not_null<ApiWrap*> api)
: _api(api) : _api(api)
, _nextTimer([=] { sendNext(); }) , _nextTimer([=] { maybeSendNext(); })
, _stopSessionsTimer([=] { stopSessions(); }) { , _stopSessionsTimer([=] { stopSessions(); }) {
const auto session = &_api->session(); const auto session = &_api->session();
photoReady( photoReady(
@ -181,22 +195,21 @@ Uploader::Uploader(not_null<ApiWrap*> api)
_api->instance().nonPremiumDelayedRequests( _api->instance().nonPremiumDelayedRequests(
) | rpl::start_with_next([=](mtpRequestId id) { ) | rpl::start_with_next([=](mtpRequestId id) {
if (_dcIndices.contains(id)) { const auto i = _requests.find(id);
_nonPremiumDelayed.emplace(id); if (i != end(_requests)) {
i->second.nonPremiumDelayed = true;
} }
}, _lifetime); }, _lifetime);
} }
void Uploader::processPhotoProgress(const FullMsgId &newId) { void Uploader::processPhotoProgress(FullMsgId itemId) {
const auto session = &_api->session(); if (const auto item = session().data().message(itemId)) {
if (const auto item = session->data().message(newId)) {
sendProgressUpdate(item, Api::SendProgressType::UploadPhoto); sendProgressUpdate(item, Api::SendProgressType::UploadPhoto);
} }
} }
void Uploader::processDocumentProgress(const FullMsgId &newId) { void Uploader::processDocumentProgress(FullMsgId itemId) {
const auto session = &_api->session(); if (const auto item = session().data().message(itemId)) {
if (const auto item = session->data().message(newId)) {
const auto media = item->media(); const auto media = item->media();
const auto document = media ? media->document() : nullptr; const auto document = media ? media->document() : nullptr;
const auto sendAction = (document && document->isVoiceMessage()) const auto sendAction = (document && document->isVoiceMessage())
@ -210,16 +223,14 @@ void Uploader::processDocumentProgress(const FullMsgId &newId) {
} }
} }
void Uploader::processPhotoFailed(const FullMsgId &newId) { void Uploader::processPhotoFailed(FullMsgId itemId) {
const auto session = &_api->session(); if (const auto item = session().data().message(itemId)) {
if (const auto item = session->data().message(newId)) {
sendProgressUpdate(item, Api::SendProgressType::UploadPhoto, -1); sendProgressUpdate(item, Api::SendProgressType::UploadPhoto, -1);
} }
} }
void Uploader::processDocumentFailed(const FullMsgId &newId) { void Uploader::processDocumentFailed(FullMsgId itemId) {
const auto session = &_api->session(); if (const auto item = session().data().message(itemId)) {
if (const auto item = session->data().message(newId)) {
const auto media = item->media(); const auto media = item->media();
const auto document = media ? media->document() : nullptr; const auto document = media ? media->document() : nullptr;
const auto sendAction = (document && document->isVoiceMessage()) const auto sendAction = (document && document->isVoiceMessage())
@ -254,8 +265,12 @@ Main::Session &Uploader::session() const {
return _api->session(); return _api->session();
} }
FullMsgId Uploader::currentUploadId() const {
return _queue.empty() ? FullMsgId() : _queue.front().itemId;
}
void Uploader::upload( void Uploader::upload(
const FullMsgId &msgId, FullMsgId itemId,
const std::shared_ptr<FilePrepareResult> &file) { const std::shared_ptr<FilePrepareResult> &file) {
if (file->type == SendMediaType::Photo) { if (file->type == SendMediaType::Photo) {
const auto photo = session().data().processPhoto( const auto photo = session().data().processPhoto(
@ -301,211 +316,147 @@ void Uploader::upload(
document->checkWallPaperProperties(); document->checkWallPaperProperties();
} }
} }
queue.emplace(msgId, File(file)); _queue.push_back({ itemId, file });
sendNext(); maybeSendNext();
} }
void Uploader::currentFailed() { void Uploader::failed(FullMsgId itemId) {
auto j = queue.find(_uploadingId); const auto i = ranges::find(_queue, itemId, &Entry::itemId);
if (j != queue.end()) { if (i != end(_queue)) {
const auto [msgId, file] = std::move(*j); const auto entry = std::move(*i);
queue.erase(j); _queue.erase(i);
notifyFailed(msgId, file); notifyFailed(entry);
} }
cancelRequests(itemId);
cancelRequests(); maybeFinishFront();
_dcIndices.clear(); crl::on_main(this, [=] {
_uploadingId = FullMsgId(); maybeSendNext();
_sentTotal = 0; });
for (int i = 0; i < MTP::kUploadSessionsCount; ++i) {
_sentPerDc[i] = 0;
}
sendNext();
} }
void Uploader::notifyFailed(FullMsgId id, const File &file) { void Uploader::notifyFailed(const Entry &entry) {
const auto type = file.file->type; const auto type = entry.file->type;
if (type == SendMediaType::Photo) { if (type == SendMediaType::Photo) {
_photoFailed.fire_copy(id); _photoFailed.fire_copy(entry.itemId);
} else if (type == SendMediaType::File } else if (type == SendMediaType::File
|| type == SendMediaType::ThemeFile || type == SendMediaType::ThemeFile
|| type == SendMediaType::Audio) { || type == SendMediaType::Audio) {
const auto document = session().data().document(file.file->id); const auto document = session().data().document(entry.file->id);
if (document->uploading()) { if (document->uploading()) {
document->status = FileUploadFailed; document->status = FileUploadFailed;
} }
_documentFailed.fire_copy(id); _documentFailed.fire_copy(entry.itemId);
} else if (type == SendMediaType::Secure) { } else if (type == SendMediaType::Secure) {
_secureFailed.fire_copy(id); _secureFailed.fire_copy(entry.itemId);
} else { } else {
Unexpected("Type in Uploader::currentFailed."); Unexpected("Type in Uploader::failed.");
} }
} }
void Uploader::stopSessions() { void Uploader::stopSessions() {
for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { if (ranges::any_of(_sentPerDcIndex, rpl::mappers::_1 != 0)) {
_api->instance().stopSession(MTP::uploadDcId(i)); _stopSessionsTimer.callOnce(kKillSessionTimeout);
} else {
for (auto i = 0; i != int(_sentPerDcIndex.size()); ++i) {
_api->instance().stopSession(MTP::uploadDcId(i));
}
_sentPerDcIndex.clear();
} }
} }
void Uploader::sendNext() { QByteArray Uploader::readDocPart(not_null<Entry*> entry) {
if (_sentTotal >= kMaxUploadFileParallelSize || _pausedId.msg) { const auto checked = [&](QByteArray result) {
return; if ((entry->file->type == SendMediaType::File
|| entry->file->type == SendMediaType::ThemeFile
|| entry->file->type == SendMediaType::Audio)
&& entry->docSize <= kUseBigFilesFrom) {
entry->md5Hash.feed(result.data(), result.size());
}
if (result.isEmpty()
|| (result.size() > entry->docPartSize)
|| ((result.size() < entry->docPartSize
&& entry->docPartsSent + 1 != entry->docPartsCount))) {
return QByteArray();
}
return result;
};
auto &content = entry->file->content;
if (!content.isEmpty()) {
const auto offset = entry->docPartsSent * entry->docPartSize;
return checked(content.mid(offset, entry->docPartSize));
} else if (!entry->docFile) {
const auto filepath = entry->file->filepath;
entry->docFile = std::make_unique<QFile>(filepath);
if (!entry->docFile->open(QIODevice::ReadOnly)) {
return QByteArray();
}
} }
return checked(entry->docFile->read(entry->docPartSize));
}
void Uploader::maybeSendNext() {
const auto stopping = _stopSessionsTimer.isActive(); const auto stopping = _stopSessionsTimer.isActive();
if (queue.empty()) { if (_queue.empty()) {
if (!stopping) { if (!stopping) {
_stopSessionsTimer.callOnce(kKillSessionTimeout); _stopSessionsTimer.callOnce(kKillSessionTimeout);
} }
_pausedId = FullMsgId();
return;
} else if (_pausedId) {
return; return;
} }
if (stopping) { if (stopping) {
_stopSessionsTimer.cancel(); _stopSessionsTimer.cancel();
} }
auto i = _uploadingId.msg ? queue.find(_uploadingId) : queue.begin();
if (!_uploadingId.msg) {
_uploadingId = i->first;
} else if (i == queue.end()) {
i = queue.begin();
_uploadingId = i->first;
}
auto &uploadingData = i->second;
auto todc = 0; auto todc = 0;
for (auto dc = 1; dc != MTP::kUploadSessionsCount; ++dc) { if (_sentPerDcIndex.size() < 2) {
if (_sentPerDc[dc] < _sentPerDc[todc]) { todc = int(_sentPerDcIndex.size());
todc = dc; _sentPerDcIndex.resize(todc + 1);
} else {
const auto min = ranges::min_element(_sentPerDcIndex);
todc = int(min - begin(_sentPerDcIndex));
}
const auto alreadySent = _sentPerDcIndex[todc];
const auto entry = [&]() -> Entry* {
for (auto i = begin(_queue); i != end(_queue); ++i) {
if (i->partsSent < i->parts->size()
|| i->docPartsSent < i->docPartsCount) {
return &*i;
}
} }
return nullptr;
}();
if (!entry) {
return;
} }
if (uploadingData.partsSent >= uploadingData.parts.size()) { const auto itemId = entry->itemId;
if (uploadingData.docSentParts >= uploadingData.docPartsCount) { if (entry->partsSent >= entry->parts->size()) {
if (_sentSizes.empty()) { const auto willProbablyBeSent = entry->docPartSize;
const auto options = uploadingData.file if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) {
? uploadingData.file->to.options
: Api::SendOptions();
const auto edit = uploadingData.file &&
uploadingData.file->to.replaceMediaOf;
const auto attachedStickers = uploadingData.file
? uploadingData.file->attachedStickers
: std::vector<MTPInputDocument>();
if (uploadingData.file->type == SendMediaType::Photo) {
auto photoFilename = uploadingData.file->filename;
if (!photoFilename.endsWith(u".jpg"_q, Qt::CaseInsensitive)) {
// Server has some extensions checking for inputMediaUploadedPhoto,
// so force the extension to be .jpg anyway. It doesn't matter,
// because the filename from inputFile is not used anywhere.
photoFilename += u".jpg"_q;
}
const auto md5 = uploadingData.file->filemd5;
const auto file = MTP_inputFile(
MTP_long(uploadingData.file->id),
MTP_int(uploadingData.parts.size()),
MTP_string(photoFilename),
MTP_bytes(md5));
_photoReady.fire({
.fullId = _uploadingId,
.info = {
.file = file,
.attachedStickers = attachedStickers,
},
.options = options,
.edit = edit,
});
} else if (uploadingData.file->type == SendMediaType::File
|| uploadingData.file->type == SendMediaType::ThemeFile
|| uploadingData.file->type == SendMediaType::Audio) {
QByteArray docMd5(32, Qt::Uninitialized);
hashMd5Hex(uploadingData.md5Hash.result(), docMd5.data());
const auto file = (uploadingData.docSize > kUseBigFilesFrom)
? MTP_inputFileBig(
MTP_long(uploadingData.file->id),
MTP_int(uploadingData.docPartsCount),
MTP_string(uploadingData.file->filename))
: MTP_inputFile(
MTP_long(uploadingData.file->id),
MTP_int(uploadingData.docPartsCount),
MTP_string(uploadingData.file->filename),
MTP_bytes(docMd5));
const auto thumb = [&]() -> std::optional<MTPInputFile> {
if (uploadingData.parts.empty()) {
return std::nullopt;
}
const auto thumbFilename = uploadingData.file->thumbname;
const auto thumbMd5 = uploadingData.file->thumbmd5;
return MTP_inputFile(
MTP_long(uploadingData.file->thumbId),
MTP_int(uploadingData.parts.size()),
MTP_string(thumbFilename),
MTP_bytes(thumbMd5));
}();
_documentReady.fire({
.fullId = _uploadingId,
.info = {
.file = file,
.thumb = thumb,
.attachedStickers = attachedStickers,
},
.options = options,
.edit = edit,
});
} else if (uploadingData.file->type == SendMediaType::Secure) {
_secureReady.fire({
_uploadingId,
uploadingData.file->id,
int(uploadingData.parts.size()),
});
}
queue.erase(_uploadingId);
_uploadingId = FullMsgId();
sendNext();
}
return; return;
} }
auto &content = uploadingData.file->content; Assert(entry->docPartsSent < entry->docPartsCount);
QByteArray toSend;
if (content.isEmpty()) { const auto partBytes = readDocPart(entry);
if (!uploadingData.docFile) { if (partBytes.isEmpty()) {
const auto filepath = uploadingData.file->filepath; failed(itemId);
uploadingData.docFile = std::make_unique<QFile>(filepath);
if (!uploadingData.docFile->open(QIODevice::ReadOnly)) {
currentFailed();
return;
}
}
toSend = uploadingData.docFile->read(uploadingData.docPartSize);
if (uploadingData.docSize <= kUseBigFilesFrom) {
uploadingData.md5Hash.feed(toSend.constData(), toSend.size());
}
} else {
const auto offset = uploadingData.docSentParts
* uploadingData.docPartSize;
toSend = content.mid(offset, uploadingData.docPartSize);
if ((uploadingData.file->type == SendMediaType::File
|| uploadingData.file->type == SendMediaType::ThemeFile
|| uploadingData.file->type == SendMediaType::Audio)
&& uploadingData.docSentParts <= kUseBigFilesFrom) {
uploadingData.md5Hash.feed(toSend.constData(), toSend.size());
}
}
if ((toSend.size() > uploadingData.docPartSize)
|| ((toSend.size() < uploadingData.docPartSize
&& uploadingData.docSentParts + 1 != uploadingData.docPartsCount))) {
currentFailed();
return; return;
} }
const auto index = entry->docPartsSent++;
++entry->docPartsWaiting;
mtpRequestId requestId; mtpRequestId requestId;
if (uploadingData.docSize > kUseBigFilesFrom) { if (entry->docSize > kUseBigFilesFrom) {
requestId = _api->request(MTPupload_SaveBigFilePart( requestId = _api->request(MTPupload_SaveBigFilePart(
MTP_long(uploadingData.file->id), MTP_long(entry->file->id),
MTP_int(uploadingData.docSentParts), MTP_int(index),
MTP_int(uploadingData.docPartsCount), MTP_int(entry->docPartsCount),
MTP_bytes(toSend) MTP_bytes(partBytes)
)).done([=](const MTPBool &result, mtpRequestId requestId) { )).done([=](const MTPBool &result, mtpRequestId requestId) {
partLoaded(result, requestId); partLoaded(result, requestId);
}).fail([=](const MTP::Error &error, mtpRequestId requestId) { }).fail([=](const MTP::Error &error, mtpRequestId requestId) {
@ -513,29 +464,34 @@ void Uploader::sendNext() {
}).toDC(MTP::uploadDcId(todc)).send(); }).toDC(MTP::uploadDcId(todc)).send();
} else { } else {
requestId = _api->request(MTPupload_SaveFilePart( requestId = _api->request(MTPupload_SaveFilePart(
MTP_long(uploadingData.file->id), MTP_long(entry->file->id),
MTP_int(uploadingData.docSentParts), MTP_int(index),
MTP_bytes(toSend) MTP_bytes(partBytes)
)).done([=](const MTPBool &result, mtpRequestId requestId) { )).done([=](const MTPBool &result, mtpRequestId requestId) {
partLoaded(result, requestId); partLoaded(result, requestId);
}).fail([=](const MTP::Error &error, mtpRequestId requestId) { }).fail([=](const MTP::Error &error, mtpRequestId requestId) {
partFailed(error, requestId); partFailed(error, requestId);
}).toDC(MTP::uploadDcId(todc)).send(); }).toDC(MTP::uploadDcId(todc)).send();
} }
_sentSizes.emplace(requestId, uploadingData.docPartSize); _requests.emplace(requestId, Request{
_docSentRequests.emplace(requestId); .itemId = itemId,
_dcIndices.emplace(requestId, todc); .sent = crl::now(),
_sentTotal += uploadingData.docPartSize; .bytes = partBytes,
_sentPerDc[todc] += uploadingData.docPartSize; .dcIndex = todc,
.docPart = true,
uploadingData.docSentParts++; });
_sentPerDcIndex[todc] += int(partBytes.size());
} else { } else {
const auto index = uploadingData.partsSent++; const auto willBeSent = entry->parts->at(entry->partsSent).size();
const auto partBytes = uploadingData.parts[index]; if (alreadySent + willBeSent >= kMaxUploadPerSession) {
const auto partSize = int(partBytes.size()); return;
}
++entry->partsWaiting;
const auto index = entry->partsSent++;
const auto partBytes = entry->parts->at(index);
const auto requestId = _api->request(MTPupload_SaveFilePart( const auto requestId = _api->request(MTPupload_SaveFilePart(
MTP_long(uploadingData.partsOfId), MTP_long(entry->partsOfId),
MTP_int(index), MTP_int(index),
MTP_bytes(partBytes) MTP_bytes(partBytes)
)).done([=](const MTPBool &result, mtpRequestId requestId) { )).done([=](const MTPBool &result, mtpRequestId requestId) {
@ -543,139 +499,232 @@ void Uploader::sendNext() {
}).fail([=](const MTP::Error &error, mtpRequestId requestId) { }).fail([=](const MTP::Error &error, mtpRequestId requestId) {
partFailed(error, requestId); partFailed(error, requestId);
}).toDC(MTP::uploadDcId(todc)).send(); }).toDC(MTP::uploadDcId(todc)).send();
_sentSizes.emplace(requestId, partSize);
_dcIndices.emplace(requestId, todc); _requests.emplace(requestId, Request{
_sentTotal += partSize; .itemId = itemId,
_sentPerDc[todc] += partSize; .sent = crl::now(),
.bytes = partBytes,
.dcIndex = todc,
});
_sentPerDcIndex[todc] += int(partBytes.size());
} }
_nextTimer.callOnce(kUploadRequestInterval); _nextTimer.callOnce(kUploadRequestInterval);
} }
void Uploader::cancel(const FullMsgId &msgId) { void Uploader::cancel(FullMsgId itemId) {
if (_uploadingId == msgId) { failed(itemId);
currentFailed();
} else {
queue.erase(msgId);
}
} }
void Uploader::cancelAll() { void Uploader::cancelAll() {
const auto single = queue.empty() ? _uploadingId : queue.begin()->first; while (!_queue.empty()) {
if (!single) { failed(_queue.front().itemId);
return;
}
_pausedId = single;
if (_uploadingId) {
currentFailed();
}
while (!queue.empty()) {
const auto [msgId, file] = std::move(*queue.begin());
queue.erase(queue.begin());
notifyFailed(msgId, file);
} }
clear(); clear();
unpause(); unpause();
} }
void Uploader::pause(const FullMsgId &msgId) { void Uploader::pause(FullMsgId itemId) {
_pausedId = msgId; _pausedId = itemId;
} }
void Uploader::unpause() { void Uploader::unpause() {
_pausedId = FullMsgId(); _pausedId = FullMsgId();
sendNext(); maybeSendNext();
} }
void Uploader::confirm(const FullMsgId &msgId) { void Uploader::cancelRequests(FullMsgId itemId) {
} for (auto i = begin(_requests); i != end(_requests);) {
if (i->second.itemId == itemId) {
void Uploader::cancelRequests() { const auto bytes = int(i->second.bytes.size());
_docSentRequests.clear(); _sentPerDcIndex[i->second.dcIndex] -= bytes;
for (const auto &requestData : _sentSizes) { _api->request(i->first).cancel();
_api->request(requestData.first).cancel(); i = _requests.erase(i);
} else {
++i;
}
} }
_sentSizes.clear(); }
void Uploader::cancelAllRequests() {
for (const auto &[requestId, request] : base::take(_requests)) {
_api->request(requestId).cancel();
}
ranges::fill(_sentPerDcIndex, 0);
} }
void Uploader::clear() { void Uploader::clear() {
queue.clear(); _queue.clear();
cancelRequests(); cancelAllRequests();
_dcIndices.clear(); stopSessions();
_sentTotal = 0;
for (int i = 0; i < MTP::kUploadSessionsCount; ++i) {
_api->instance().stopSession(MTP::uploadDcId(i));
_sentPerDc[i] = 0;
}
_stopSessionsTimer.cancel(); _stopSessionsTimer.cancel();
} }
void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { Uploader::Request Uploader::finishRequest(mtpRequestId requestId) {
_docSentRequests.remove(requestId); const auto taken = _requests.take(requestId);
auto i = _sentSizes.find(requestId); Assert(taken.has_value());
const auto wasNonPremiumDelayed = _nonPremiumDelayed.remove(requestId);
if (i != _sentSizes.cend()) {
if (mtpIsFalse(result)) { // failed to upload current file
currentFailed();
return;
} else {
auto dcIt = _dcIndices.find(requestId);
if (dcIt == _dcIndices.cend()) { // must not happen
currentFailed();
return;
}
auto dc = dcIt->second;
_dcIndices.erase(dcIt);
int64 sentPartSize = i->second; _sentPerDcIndex[taken->dcIndex] -= int(taken->bytes.size());
auto k = queue.find(_uploadingId); return *taken;
Assert(k != queue.cend()); }
auto &[fullId, file] = *k;
_sentSizes.erase(i); void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) {
_sentTotal -= sentPartSize; const auto request = finishRequest(requestId);
_sentPerDc[dc] -= sentPartSize;
if (file.file->type == SendMediaType::Photo) { const auto bytes = int(request.bytes.size());
file.fileSentSize += sentPartSize; const auto itemId = request.itemId;
const auto photo = session().data().photo(file.file->id);
if (photo->uploading() && file.file) { if (mtpIsFalse(result)) { // failed to upload current file
photo->uploadingData->size = file.file->partssize; failed(itemId);
photo->uploadingData->offset = file.fileSentSize; return;
}
_photoProgress.fire_copy(fullId);
} else if (file.file->type == SendMediaType::File
|| file.file->type == SendMediaType::ThemeFile
|| file.file->type == SendMediaType::Audio) {
const auto document = session().data().document(file.file->id);
if (document->uploading()) {
const auto doneParts = file.docSentParts
- int(_docSentRequests.size());
document->uploadingData->offset = std::min(
document->uploadingData->size,
doneParts * file.docPartSize);
}
_documentProgress.fire_copy(fullId);
} else if (file.file->type == SendMediaType::Secure) {
file.fileSentSize += sentPartSize;
_secureProgress.fire_copy({
fullId,
file.fileSentSize,
file.file->partssize });
}
if (wasNonPremiumDelayed) {
_nonPremiumDelays.fire_copy(fullId);
}
}
} }
sendNext(); const auto i = ranges::find(_queue, itemId, &Entry::itemId);
Assert(i != end(_queue));
auto &entry = *i;
if (request.docPart) {
--entry.docPartsWaiting;
entry.docSentSize += bytes;
} else {
--entry.partsWaiting;
entry.partsSentSize += bytes;
}
if (entry.file->type == SendMediaType::Photo) {
const auto photo = session().data().photo(entry.file->id);
if (photo->uploading()) {
photo->uploadingData->size = entry.file->partssize;
photo->uploadingData->offset = entry.partsSentSize;
}
_photoProgress.fire_copy(itemId);
} else if (entry.file->type == SendMediaType::File
|| entry.file->type == SendMediaType::ThemeFile
|| entry.file->type == SendMediaType::Audio) {
const auto document = session().data().document(entry.file->id);
if (document->uploading()) {
document->uploadingData->offset = std::min(
document->uploadingData->size,
entry.docSentSize);
}
_documentProgress.fire_copy(itemId);
} else if (entry.file->type == SendMediaType::Secure) {
_secureProgress.fire_copy({
.fullId = itemId,
.offset = entry.partsSentSize,
.size = entry.file->partssize,
});
}
if (request.nonPremiumDelayed) {
_nonPremiumDelays.fire_copy(itemId);
}
if (!_queue.empty() && itemId == _queue.front().itemId) {
maybeFinishFront();
}
maybeSendNext();
}
void Uploader::maybeFinishFront() {
while (!_queue.empty()) {
const auto &entry = _queue.front();
if (entry.partsSent >= entry.parts->size()
&& entry.docPartsSent >= entry.docPartsCount
&& !entry.partsWaiting
&& !entry.docPartsWaiting) {
finishFront();
} else {
break;
}
}
}
void Uploader::finishFront() {
Expects(!_queue.empty());
auto entry = std::move(_queue.front());
_queue.erase(_queue.begin());
const auto options = entry.file
? entry.file->to.options
: Api::SendOptions();
const auto edit = entry.file &&
entry.file->to.replaceMediaOf;
const auto attachedStickers = entry.file
? entry.file->attachedStickers
: std::vector<MTPInputDocument>();
if (entry.file->type == SendMediaType::Photo) {
auto photoFilename = entry.file->filename;
if (!photoFilename.endsWith(u".jpg"_q, Qt::CaseInsensitive)) {
// Server has some extensions checking for inputMediaUploadedPhoto,
// so force the extension to be .jpg anyway. It doesn't matter,
// because the filename from inputFile is not used anywhere.
photoFilename += u".jpg"_q;
}
const auto md5 = entry.file->filemd5;
const auto file = MTP_inputFile(
MTP_long(entry.file->id),
MTP_int(entry.parts->size()),
MTP_string(photoFilename),
MTP_bytes(md5));
_photoReady.fire({
.fullId = entry.itemId,
.info = {
.file = file,
.attachedStickers = attachedStickers,
},
.options = options,
.edit = edit,
});
} else if (entry.file->type == SendMediaType::File
|| entry.file->type == SendMediaType::ThemeFile
|| entry.file->type == SendMediaType::Audio) {
QByteArray docMd5(32, Qt::Uninitialized);
hashMd5Hex(entry.md5Hash.result(), docMd5.data());
const auto file = (entry.docSize > kUseBigFilesFrom)
? MTP_inputFileBig(
MTP_long(entry.file->id),
MTP_int(entry.docPartsCount),
MTP_string(entry.file->filename))
: MTP_inputFile(
MTP_long(entry.file->id),
MTP_int(entry.docPartsCount),
MTP_string(entry.file->filename),
MTP_bytes(docMd5));
const auto thumb = [&]() -> std::optional<MTPInputFile> {
if (entry.parts->empty()) {
return std::nullopt;
}
const auto thumbFilename = entry.file->thumbname;
const auto thumbMd5 = entry.file->thumbmd5;
return MTP_inputFile(
MTP_long(entry.file->thumbId),
MTP_int(entry.parts->size()),
MTP_string(thumbFilename),
MTP_bytes(thumbMd5));
}();
_documentReady.fire({
.fullId = entry.itemId,
.info = {
.file = file,
.thumb = thumb,
.attachedStickers = attachedStickers,
},
.options = options,
.edit = edit,
});
} else if (entry.file->type == SendMediaType::Secure) {
_secureReady.fire({
entry.itemId,
entry.file->id,
int(entry.parts->size()),
});
}
} }
void Uploader::partFailed(const MTP::Error &error, mtpRequestId requestId) { void Uploader::partFailed(const MTP::Error &error, mtpRequestId requestId) {
// failed to upload current file const auto request = finishRequest(requestId);
_nonPremiumDelayed.remove(requestId); failed(request.itemId);
if (_sentSizes.find(requestId) != _sentSizes.cend()) {
currentFailed();
}
sendNext();
} }
} // namespace Storage } // namespace Storage

View file

@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "api/api_common.h" #include "api/api_common.h"
#include "base/timer.h" #include "base/timer.h"
#include "base/weak_ptr.h"
#include "mtproto/facade.h" #include "mtproto/facade.h"
class ApiWrap; class ApiWrap;
@ -46,53 +47,48 @@ struct UploadSecureDone {
int partsCount = 0; int partsCount = 0;
}; };
class Uploader final : public QObject { class Uploader final : public base::has_weak_ptr {
public: public:
explicit Uploader(not_null<ApiWrap*> api); explicit Uploader(not_null<ApiWrap*> api);
~Uploader(); ~Uploader();
[[nodiscard]] Main::Session &session() const; [[nodiscard]] Main::Session &session() const;
[[nodiscard]] FullMsgId currentUploadId() const;
[[nodiscard]] FullMsgId currentUploadId() const {
return _uploadingId;
}
void upload( void upload(
const FullMsgId &msgId, FullMsgId itemId,
const std::shared_ptr<FilePrepareResult> &file); const std::shared_ptr<FilePrepareResult> &file);
void cancel(const FullMsgId &msgId); void pause(FullMsgId itemId);
void pause(const FullMsgId &msgId); void cancel(FullMsgId itemId);
void confirm(const FullMsgId &msgId);
void cancelAll(); void cancelAll();
void clear();
rpl::producer<UploadedMedia> photoReady() const { [[nodiscard]] rpl::producer<UploadedMedia> photoReady() const {
return _photoReady.events(); return _photoReady.events();
} }
rpl::producer<UploadedMedia> documentReady() const { [[nodiscard]] rpl::producer<UploadedMedia> documentReady() const {
return _documentReady.events(); return _documentReady.events();
} }
rpl::producer<UploadSecureDone> secureReady() const { [[nodiscard]] rpl::producer<UploadSecureDone> secureReady() const {
return _secureReady.events(); return _secureReady.events();
} }
rpl::producer<FullMsgId> photoProgress() const { [[nodiscard]] rpl::producer<FullMsgId> photoProgress() const {
return _photoProgress.events(); return _photoProgress.events();
} }
rpl::producer<FullMsgId> documentProgress() const { [[nodiscard]] rpl::producer<FullMsgId> documentProgress() const {
return _documentProgress.events(); return _documentProgress.events();
} }
rpl::producer<UploadSecureProgress> secureProgress() const { [[nodiscard]] auto secureProgress() const
-> rpl::producer<UploadSecureProgress> {
return _secureProgress.events(); return _secureProgress.events();
} }
rpl::producer<FullMsgId> photoFailed() const { [[nodiscard]] rpl::producer<FullMsgId> photoFailed() const {
return _photoFailed.events(); return _photoFailed.events();
} }
rpl::producer<FullMsgId> documentFailed() const { [[nodiscard]] rpl::producer<FullMsgId> documentFailed() const {
return _documentFailed.events(); return _documentFailed.events();
} }
rpl::producer<FullMsgId> secureFailed() const { [[nodiscard]] rpl::producer<FullMsgId> secureFailed() const {
return _secureFailed.events(); return _secureFailed.events();
} }
@ -101,23 +97,31 @@ public:
} }
void unpause(); void unpause();
void sendNext();
void stopSessions(); void stopSessions();
private: private:
struct File; struct Entry;
struct Request;
[[nodiscard]] QByteArray readDocPart(not_null<Entry*> entry);
void maybeSendNext();
void maybeFinishFront();
void finishFront();
void partLoaded(const MTPBool &result, mtpRequestId requestId); void partLoaded(const MTPBool &result, mtpRequestId requestId);
void partFailed(const MTP::Error &error, mtpRequestId requestId); void partFailed(const MTP::Error &error, mtpRequestId requestId);
Request finishRequest(mtpRequestId requestId);
void processPhotoProgress(const FullMsgId &msgId); void processPhotoProgress(FullMsgId itemId);
void processPhotoFailed(const FullMsgId &msgId); void processPhotoFailed(FullMsgId itemId);
void processDocumentProgress(const FullMsgId &msgId); void processDocumentProgress(FullMsgId itemId);
void processDocumentFailed(const FullMsgId &msgId); void processDocumentFailed(FullMsgId itemId);
void notifyFailed(FullMsgId id, const File &file); void notifyFailed(const Entry &entry);
void currentFailed(); void failed(FullMsgId itemId);
void cancelRequests(); void cancelRequests(FullMsgId itemId);
void cancelAllRequests();
void clear();
void sendProgressUpdate( void sendProgressUpdate(
not_null<HistoryItem*> item, not_null<HistoryItem*> item,
@ -125,16 +129,13 @@ private:
int progress = 0); int progress = 0);
const not_null<ApiWrap*> _api; const not_null<ApiWrap*> _api;
base::flat_map<mtpRequestId, int> _sentSizes;
base::flat_set<mtpRequestId> _docSentRequests;
base::flat_map<mtpRequestId, int> _dcIndices;
base::flat_set<mtpRequestId> _nonPremiumDelayed;
uint32 _sentTotal = 0; // FileSize: Right now any file size fits 32 bit.
uint32 _sentPerDc[MTP::kUploadSessionsCount] = { 0 };
FullMsgId _uploadingId; std::vector<Entry> _queue;
base::flat_map<mtpRequestId, Request> _requests;
std::vector<int> _sentPerDcIndex;
FullMsgId _pausedId; FullMsgId _pausedId;
std::map<FullMsgId, File> queue;
base::Timer _nextTimer, _stopSessionsTimer; base::Timer _nextTimer, _stopSessionsTimer;
rpl::event_stream<UploadedMedia> _photoReady; rpl::event_stream<UploadedMedia> _photoReady;