diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index b17f50b88..05cc6c457 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -139,7 +139,7 @@ Connection::Connection( moveToThread(thread); - connect(thread, &QThread::started, this, [=] { + InvokeQueued(this, [=] { _checkSentRequestsTimer.callEach(kCheckSentRequestsEach); connectToServer(); }); diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index 3fb10a63b..a24caabee 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -170,6 +170,7 @@ private: Session *findSession(ShiftedDcId shiftedDcId); not_null startSession(ShiftedDcId shiftedDcId); Session *removeSession(ShiftedDcId shiftedDcId); + [[nodiscard]] not_null getThreadForDc(ShiftedDcId shiftedDcId); void applyDomainIps( const QString &host, @@ -200,6 +201,10 @@ private: const not_null _dcOptions; const Instance::Mode _mode = Instance::Mode::Normal; + std::unique_ptr _mainSessionThread; + std::unique_ptr _otherSessionsThread; + std::vector> _fileSessionThreads; + QString _deviceModel; QString _systemVersion; @@ -263,6 +268,8 @@ Instance::Private::Private( , _instance(instance) , _dcOptions(options) , _mode(mode) { + const auto idealThreadPoolSize = QThread::idealThreadCount(); + _fileSessionThreads.resize(2 * std::max(idealThreadPoolSize / 2, 1)); } void Instance::Private::start(Config &&config) { @@ -1486,11 +1493,11 @@ not_null Instance::Private::startSession(ShiftedDcId shiftedDcId) { Expects(BareDcId(shiftedDcId) != 0); const auto dc = getDcById(shiftedDcId); + const auto thread = getThreadForDc(shiftedDcId); const auto result = _sessions.emplace( shiftedDcId, - std::make_unique(_instance, shiftedDcId, dc) + std::make_unique(_instance, thread, shiftedDcId, dc) ).first->second.get(); - result->start(); if (isKeysDestroyer()) { scheduleKeyDestroy(shiftedDcId); } @@ -1509,6 +1516,47 @@ Session *Instance::Private::removeSession(ShiftedDcId shiftedDcId) { return _sessionsToDestroy.back().get(); } + +not_null Instance::Private::getThreadForDc( + ShiftedDcId shiftedDcId) { + static const auto EnsureStarted = [](std::unique_ptr &thread) { + if (!thread) { + thread = std::make_unique(); + thread->start(); + } + return thread.get(); + }; + static const auto FindOne = []( + std::vector> &threads, + int index, + bool shift) { + Expects(!threads.empty()); + Expects(!(threads.size() % 2)); + + const auto count = int(threads.size()); + index %= count; + if (index >= count / 2) { + index = (count - 1) - (index - count / 2); + } + if (shift) { + index = (index + count / 2) % count; + } + return EnsureStarted(threads[index]); + }; + if (shiftedDcId == BareDcId(shiftedDcId)) { + return EnsureStarted(_mainSessionThread); + } else if (isDownloadDcId(shiftedDcId)) { + const auto index = GetDcIdShift(shiftedDcId) - kBaseDownloadDcShift; + const auto composed = index + BareDcId(shiftedDcId); + return FindOne(_fileSessionThreads, composed, false); + } else if (isUploadDcId(shiftedDcId)) { + const auto index = GetDcIdShift(shiftedDcId) - kBaseUploadDcShift; + const auto composed = index + BareDcId(shiftedDcId); + return FindOne(_fileSessionThreads, composed, true); + } + return EnsureStarted(_otherSessionsThread); +} + void Instance::Private::scheduleKeyDestroy(ShiftedDcId shiftedDcId) { Expects(isKeysDestroyer()); @@ -1616,6 +1664,23 @@ void Instance::Private::prepareToDestroy() { session->kill(); } _mainSession = nullptr; + + auto threads = std::vector>(); + threads.push_back(base::take(_mainSessionThread)); + threads.push_back(base::take(_otherSessionsThread)); + for (auto &thread : base::take(_fileSessionThreads)) { + threads.push_back(std::move(thread)); + } + for (const auto &thread : threads) { + if (thread) { + thread->quit(); + } + } + for (const auto &thread : threads) { + if (thread) { + thread->wait(); + } + } } Instance::Instance(not_null options, Mode mode, Config &&config) diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index ec9e0d58a..00294cd3d 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -146,29 +146,28 @@ void SessionData::detach() { Session::Session( not_null instance, + not_null thread, ShiftedDcId shiftedDcId, not_null dc) : _instance(instance) , _shiftedDcId(shiftedDcId) , _dc(dc) , _data(std::make_shared(this)) +, _thread(thread) , _sender([=] { needToResumeAndSend(); }) { _timeouter.callEach(1000); refreshOptions(); watchDcKeyChanges(); watchDcOptionsChanges(); + start(); } Session::~Session() { Expects(!_connection); - Expects(!_thread); if (_myKeyCreation != CreatingKeyType::None) { releaseKeyCreationOnFail(); } - for (const auto &thread : _destroyingThreads) { - thread->wait(); - } } void Session::watchDcKeyChanges() { @@ -210,27 +209,11 @@ void Session::watchDcOptionsChanges() { void Session::start() { killConnection(); - - _thread = std::make_unique(); - const auto thread = _thread.get(); - - connect(thread, &QThread::finished, [=] { - InvokeQueued(this, [=] { - const auto i = ranges::find( - _destroyingThreads, - thread, - &std::unique_ptr::get); - if (i != _destroyingThreads.end()) { - _destroyingThreads.erase(i); - } - }); - }); _connection = new Connection( _instance, - thread, + _thread.get(), _data, _shiftedDcId); - thread->start(); } bool Session::rpcErrorOccured( @@ -578,18 +561,13 @@ void Session::tryToReceive() { } void Session::killConnection() { - Expects(!_thread || _connection); - if (!_connection) { return; } base::take(_connection)->deleteLater(); - _destroyingThreads.push_back(base::take(_thread)); - _destroyingThreads.back()->quit(); Ensures(_connection == nullptr); - Ensures(_thread == nullptr); } } // namespace internal diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index 6a9ced818..81b484afe 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -138,6 +138,7 @@ public: // Main thread. Session( not_null instance, + not_null thread, ShiftedDcId shiftedDcId, not_null dc); ~Session(); @@ -198,9 +199,7 @@ private: const ShiftedDcId _shiftedDcId = 0; const not_null _dc; const std::shared_ptr _data; - - std::unique_ptr _thread; - std::vector> _destroyingThreads; + const not_null _thread; Connection *_connection = nullptr;