diff --git a/Telegram/SourceFiles/main/main_account.cpp b/Telegram/SourceFiles/main/main_account.cpp index 55ebf9c47..82030ea93 100644 --- a/Telegram/SourceFiles/main/main_account.cpp +++ b/Telegram/SourceFiles/main/main_account.cpp @@ -453,18 +453,14 @@ void Account::destroyMtpKeys(MTP::AuthKeysList &&keys) { Core::App().dcOptions(), MTP::Instance::Mode::KeysDestroyer, std::move(destroyConfig)); - QObject::connect( - _mtpForKeysDestroy.get(), - &MTP::Instance::allKeysDestroyed, - [=] { allKeysDestroyed(); }); -} - -void Account::allKeysDestroyed() { - LOG(("MTP Info: all keys scheduled for destroy are destroyed.")); - crl::on_main(this, [=] { - _mtpForKeysDestroy = nullptr; - Local::writeMtpData(); - }); + _mtpForKeysDestroy->allKeysDestroyed( + ) | rpl::start_with_next([=] { + LOG(("MTP Info: all keys scheduled for destroy are destroyed.")); + crl::on_main(this, [=] { + _mtpForKeysDestroy = nullptr; + Local::writeMtpData(); + }); + }, _lifetime); } void Account::suggestMainDcId(MTP::DcId mainDcId) { diff --git a/Telegram/SourceFiles/main/main_account.h b/Telegram/SourceFiles/main/main_account.h index 844f47845..374cad860 100644 --- a/Telegram/SourceFiles/main/main_account.h +++ b/Telegram/SourceFiles/main/main_account.h @@ -85,7 +85,6 @@ private: void watchSessionChanges(); void destroyMtpKeys(MTP::AuthKeysList &&keys); - void allKeysDestroyed(); void resetAuthorizationKeys(); void loggedOut(); diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 8e35f8977..b17f50b88 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -114,75 +114,46 @@ void WrapInvokeAfter( } // namespace -Connection::Connection(not_null instance) -: _instance(instance) { +Connection::Connection( + not_null instance, + not_null thread, + std::shared_ptr data, + ShiftedDcId shiftedDcId) +: QObject(nullptr) +, _instance(instance) +, _shiftedDcId(shiftedDcId) +, _realDcType(_instance->dcOptions()->dcType(_shiftedDcId)) +, _currentDcType(_realDcType) +, _state(DisconnectedState) +, _retryTimer(thread, [=] { retryByTimer(); }) +, _oldConnectionTimer(thread, [=] { markConnectionOld(); }) +, _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); }) +, _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); }) +, _waitForBetterTimer(thread, [=] { waitBetterFailed(); }) +, _waitForReceived(kMinReceiveTimeout) +, _waitForConnected(kMinConnectedTimeout) +, _pingSender(thread, [=] { sendPingByTimer(); }) +, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); }) +, _sessionData(std::move(data)) { + Expects(_shiftedDcId != 0); + + moveToThread(thread); + + connect(thread, &QThread::started, this, [=] { + _checkSentRequestsTimer.callEach(kCheckSentRequestsEach); + connectToServer(); + }); } Connection::~Connection() { - Expects(_private == nullptr); + releaseKeyCreationOnFail(); + doDisconnect(); - if (_thread) { - waitTillFinish(); - } + Expects(!_connection); + Expects(_testConnections.empty()); } -void Connection::start( - std::shared_ptr sessionData, - ShiftedDcId shiftedDcId) { - Expects(_thread == nullptr && _private == nullptr); - - _thread = std::make_unique(); - auto newData = std::make_unique( - _instance, - _thread.get(), - this, - std::move(sessionData), - shiftedDcId); - - _instance->dcOptions()->changed( - ) | rpl::filter([=](DcId dcId) { - return (BareDcId(shiftedDcId) == dcId) && (_private != nullptr); - }) | rpl::start_with_next([=] { - const auto raw = _private; - InvokeQueued(raw, [=] { - raw->dcOptionsChanged(); - }); - }, _lifetime); - - // will be deleted in the thread::finished signal - _private = newData.release(); - _thread->start(); -} - -void Connection::kill() { - Expects(_private != nullptr && _thread != nullptr); - - _private->stop(); - _private = nullptr; - _thread->quit(); -} - -void Connection::waitTillFinish() { - Expects(_private == nullptr && _thread != nullptr); - - DEBUG_LOG(("Waiting for connectionThread to finish")); - _thread->wait(); - _thread.reset(); -} - -int32 Connection::state() const { - Expects(_private != nullptr && _thread != nullptr); - - return _private->getState(); -} - -QString Connection::transport() const { - Expects(_private != nullptr && _thread != nullptr); - - return _private->transport(); -} - -void ConnectionPrivate::appendTestConnection( +void Connection::appendTestConnection( DcOptions::Variants::Protocol protocol, const QString &ip, int port, @@ -231,7 +202,7 @@ void ConnectionPrivate::appendTestConnection( }); } -int16 ConnectionPrivate::getProtocolDcId() const { +int16 Connection::getProtocolDcId() const { const auto dcId = BareDcId(_shiftedDcId); const auto simpleDcId = isTemporaryDcId(dcId) ? getRealIdFromTemporaryDcId(dcId) @@ -244,7 +215,7 @@ int16 ConnectionPrivate::getProtocolDcId() const { : testedDcId; } -void ConnectionPrivate::checkSentRequests() { +void Connection::checkSentRequests() { // Remove very old (10 minutes) containers and resend requests. auto removingIds = std::vector(); auto requesting = false; @@ -285,7 +256,7 @@ void ConnectionPrivate::checkSentRequests() { } } -void ConnectionPrivate::destroyAllConnections() { +void Connection::destroyAllConnections() { clearUnboundKeyCreator(); _waitForBetterTimer.cancel(); _waitForReceivedTimer.cancel(); @@ -294,67 +265,20 @@ void ConnectionPrivate::destroyAllConnections() { _connection = nullptr; } -ConnectionPrivate::ConnectionPrivate( - not_null instance, - not_null thread, - not_null owner, - std::shared_ptr data, - ShiftedDcId shiftedDcId) -: QObject(nullptr) -, _instance(instance) -, _owner(owner) -, _shiftedDcId(shiftedDcId) -, _realDcType(_instance->dcOptions()->dcType(_shiftedDcId)) -, _currentDcType(_realDcType) -, _state(DisconnectedState) -, _retryTimer(thread, [=] { retryByTimer(); }) -, _oldConnectionTimer(thread, [=] { markConnectionOld(); }) -, _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); }) -, _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); }) -, _waitForBetterTimer(thread, [=] { waitBetterFailed(); }) -, _waitForReceived(kMinReceiveTimeout) -, _waitForConnected(kMinConnectedTimeout) -, _pingSender(thread, [=] { sendPingByTimer(); }) -, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); }) -, _sessionData(std::move(data)) { - Expects(_shiftedDcId != 0); - - moveToThread(thread); - - connect(thread, &QThread::started, this, [=] { - _checkSentRequestsTimer.callEach(kCheckSentRequestsEach); - connectToServer(); - }); - connect(thread, &QThread::finished, this, [=] { finishAndDestroy(); }); - - connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection); - connect(_sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection); - connect(_sessionData->owner(), SIGNAL(needToSend()), this, SLOT(tryToSend()), Qt::QueuedConnection); - connect(_sessionData->owner(), SIGNAL(needToPing()), this, SLOT(onPingSendForce()), Qt::QueuedConnection); +void Connection::cdnConfigChanged() { + connectToServer(true); } -ConnectionPrivate::~ConnectionPrivate() { - releaseKeyCreationOnFail(); - - Expects(_finished); - Expects(!_connection); - Expects(_testConnections.empty()); -} - -void ConnectionPrivate::onCDNConfigLoaded() { - restart(); -} - -int32 ConnectionPrivate::getShiftedDcId() const { +int32 Connection::getShiftedDcId() const { return _shiftedDcId; } -void ConnectionPrivate::dcOptionsChanged() { +void Connection::dcOptionsChanged() { _retryTimeout = 1; connectToServer(true); } -int32 ConnectionPrivate::getState() const { +int32 Connection::getState() const { QReadLocker lock(&_stateMutex); int32 result = _state; if (_state < 0) { @@ -368,7 +292,7 @@ int32 ConnectionPrivate::getState() const { return result; } -QString ConnectionPrivate::transport() const { +QString Connection::transport() const { QReadLocker lock(&_stateMutex); if (!_connection || (_state < 0)) { return QString(); @@ -378,8 +302,8 @@ QString ConnectionPrivate::transport() const { return _connection->transport(); } -bool ConnectionPrivate::setState(int32 state, int32 ifState) { - if (ifState != Connection::UpdateAlways) { +bool Connection::setState(int state, int ifState) { + if (ifState != kUpdateStateAlways) { QReadLocker lock(&_stateMutex); if (_state != ifState) { return false; @@ -402,7 +326,7 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) { return true; } -void ConnectionPrivate::resetSession() { +void Connection::resetSession() { MTP_LOG(_shiftedDcId, ("Resetting session!")); _needSessionReset = false; @@ -412,7 +336,7 @@ void ConnectionPrivate::resetSession() { _sessionData->queueResetDone(); } -void ConnectionPrivate::changeSessionId() { +void Connection::changeSessionId() { auto sessionId = _sessionId; do { sessionId = openssl::RandomValue(); @@ -429,13 +353,13 @@ void ConnectionPrivate::changeSessionId() { _receivedMessageIds.clear(); } -uint32 ConnectionPrivate::nextRequestSeqNumber(bool needAck) { +uint32 Connection::nextRequestSeqNumber(bool needAck) { const auto result = _messagesCounter; _messagesCounter += (needAck ? 1 : 0); return result * 2 + (needAck ? 1 : 0); } -bool ConnectionPrivate::realDcTypeChanged() { +bool Connection::realDcTypeChanged() { const auto now = _instance->dcOptions()->dcType(_shiftedDcId); if (_realDcType == now) { return false; @@ -444,7 +368,7 @@ bool ConnectionPrivate::realDcTypeChanged() { return true; } -bool ConnectionPrivate::markSessionAsStarted() { +bool Connection::markSessionAsStarted() { if (_sessionMarkedAsStarted) { return false; } @@ -452,7 +376,7 @@ bool ConnectionPrivate::markSessionAsStarted() { return true; } -mtpMsgId ConnectionPrivate::prepareToSend( +mtpMsgId Connection::prepareToSend( SerializedRequest &request, mtpMsgId currentLastId, bool forceNewMsgId) { @@ -477,7 +401,7 @@ mtpMsgId ConnectionPrivate::prepareToSend( return currentLastId; } -mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) { +mtpMsgId Connection::replaceMsgId(SerializedRequest &request, mtpMsgId newId) { Expects(request->size() > 8); const auto oldMsgId = request.getMsgId(); @@ -535,7 +459,7 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId ne return newId; } -mtpMsgId ConnectionPrivate::placeToContainer( +mtpMsgId Connection::placeToContainer( SerializedRequest &toSendRequest, mtpMsgId &bigMsgId, bool forceNewMsgId, @@ -554,7 +478,7 @@ mtpMsgId ConnectionPrivate::placeToContainer( return msgId; } -void ConnectionPrivate::tryToSend() { +void Connection::tryToSend() { if (!_connection || !_keyId) { return; } @@ -920,7 +844,7 @@ void ConnectionPrivate::tryToSend() { sendSecureRequest(std::move(toSendRequest), needAnyResponse); } -void ConnectionPrivate::retryByTimer() { +void Connection::retryByTimer() { if (_retryTimeout < 3) { ++_retryTimeout; } else if (_retryTimeout == 3) { @@ -931,18 +855,14 @@ void ConnectionPrivate::retryByTimer() { connectToServer(); } -void ConnectionPrivate::restartNow() { +void Connection::restartNow() { _retryTimeout = 1; _retryTimer.cancel(); restart(); } -void ConnectionPrivate::connectToServer(bool afterConfig) { - if (_finished) { - DEBUG_LOG(("MTP Error: " - "connectToServer() called for finished connection!")); - return; - } else if (afterConfig && (!_testConnections.empty() || _connection)) { +void Connection::connectToServer(bool afterConfig) { + if (afterConfig && (!_testConnections.empty() || _connection)) { return; } @@ -1046,7 +966,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) { _waitForConnectedTimer.callOnce(_waitForConnected); } -void ConnectionPrivate::restart() { +void Connection::restart() { DEBUG_LOG(("MTP Info: restarting Connection")); _waitForReceivedTimer.cancel(); @@ -1066,7 +986,7 @@ void ConnectionPrivate::restart() { setState(-_retryTimeout); } -void ConnectionPrivate::onSentSome(uint64 size) { +void Connection::onSentSome(uint64 size) { if (!_waitForReceivedTimer.isActive()) { auto remain = static_cast(_waitForReceived); if (!_oldConnection) { @@ -1087,7 +1007,7 @@ void ConnectionPrivate::onSentSome(uint64 size) { if (!_firstSentAt) _firstSentAt = crl::now(); } -void ConnectionPrivate::onReceivedSome() { +void Connection::onReceivedSome() { if (_oldConnection) { _oldConnection = false; DEBUG_LOG(("This connection marked as not old!")); @@ -1105,13 +1025,13 @@ void ConnectionPrivate::onReceivedSome() { } } -void ConnectionPrivate::markConnectionOld() { +void Connection::markConnectionOld() { _oldConnection = true; _waitForReceived = kMinReceiveTimeout; DEBUG_LOG(("This connection marked as old! _waitForReceived now %1ms").arg(_waitForReceived)); } -void ConnectionPrivate::sendPingByTimer() { +void Connection::sendPingByTimer() { if (_pingId) { // _pingSendAt: when to send next ping (lastPingAt + kPingSendAfter) // could be equal to zero. @@ -1130,7 +1050,7 @@ void ConnectionPrivate::sendPingByTimer() { } } -void ConnectionPrivate::onPingSendForce() { +void Connection::sendPingForce() { if (!_pingId) { _pingSendAt = 0; DEBUG_LOG(("Will send ping!")); @@ -1138,7 +1058,7 @@ void ConnectionPrivate::onPingSendForce() { } } -void ConnectionPrivate::waitReceivedFailed() { +void Connection::waitReceivedFailed() { Expects(_connectionOptions != nullptr); if (!_connectionOptions->useTcp) { @@ -1158,7 +1078,7 @@ void ConnectionPrivate::waitReceivedFailed() { InvokeQueued(this, [=] { connectToServer(); }); } -void ConnectionPrivate::waitConnectedFailed() { +void Connection::waitConnectedFailed() { DEBUG_LOG(("MTP Info: can't connect in %1ms").arg(_waitForConnected)); auto maxTimeout = kMaxConnectedTimeout; for (const auto &connection : _testConnections) { @@ -1174,46 +1094,29 @@ void ConnectionPrivate::waitConnectedFailed() { InvokeQueued(this, [=] { connectToServer(); }); } -void ConnectionPrivate::waitBetterFailed() { +void Connection::waitBetterFailed() { confirmBestConnection(); } -void ConnectionPrivate::connectingTimedOut() { +void Connection::connectingTimedOut() { for (const auto &connection : _testConnections) { connection.data->timedOut(); } doDisconnect(); } -void ConnectionPrivate::doDisconnect() { +void Connection::doDisconnect() { destroyAllConnections(); setState(DisconnectedState); } -void ConnectionPrivate::finishAndDestroy() { - doDisconnect(); - _finished = true; - const auto connection = _owner; - const auto instance = _instance; - InvokeQueued(instance, [=] { - instance->connectionFinished(connection); - }); - deleteLater(); -} - -void ConnectionPrivate::requestCDNConfig() { - connect( - _instance, - SIGNAL(cdnConfigLoaded()), - this, - SLOT(onCDNConfigLoaded()), - Qt::UniqueConnection); +void Connection::requestCDNConfig() { InvokeQueued(_instance, [instance = _instance] { instance->requestCDNConfig(); }); } -void ConnectionPrivate::handleReceived() { +void Connection::handleReceived() { Expects(_encryptionKey != nullptr); onReceivedSome(); @@ -1413,7 +1316,7 @@ void ConnectionPrivate::handleReceived() { } } -ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( +Connection::HandleResult Connection::handleOneReceived( const mtpPrime *from, const mtpPrime *end, uint64 msgId, @@ -1931,7 +1834,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( return HandleResult::Success; } -ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse( +Connection::HandleResult Connection::handleBindResponse( mtpMsgId requestMsgId, const mtpBuffer &response) { if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) { @@ -1959,7 +1862,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse( Unexpected("Result of BoundKeyCreator::handleBindResponse."); } -mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const { +mtpBuffer Connection::ungzip(const mtpPrime *from, const mtpPrime *end) const { mtpBuffer result; // * 4 because of mtpPrime type result.resize(0); @@ -2011,7 +1914,7 @@ mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) c return result; } -bool ConnectionPrivate::requestsFixTimeSalt(const QVector &ids, int32 serverTime, uint64 serverSalt) { +bool Connection::requestsFixTimeSalt(const QVector &ids, int32 serverTime, uint64 serverSalt) { uint32 idsCount = ids.size(); for (uint32 i = 0; i < idsCount; ++i) { @@ -2026,7 +1929,7 @@ bool ConnectionPrivate::requestsFixTimeSalt(const QVector &ids, int32 s return false; } -void ConnectionPrivate::requestsAcked(const QVector &ids, bool byResponse) { +void Connection::requestsAcked(const QVector &ids, bool byResponse) { uint32 idsCount = ids.size(); DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids))); @@ -2120,7 +2023,7 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon } } -void ConnectionPrivate::handleMsgsStates(const QVector &ids, const QByteArray &states, QVector &acked) { +void Connection::handleMsgsStates(const QVector &ids, const QByteArray &states, QVector &acked) { uint32 idsCount = ids.size(); if (!idsCount) { DEBUG_LOG(("Message Info: void ids vector in handleMsgsStates()")); @@ -2163,7 +2066,7 @@ void ConnectionPrivate::handleMsgsStates(const QVector &ids, const QByt } } -void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) { +void Connection::clearSpecialMsgId(mtpMsgId msgId) { if (msgId == _pingMsgId) { _pingMsgId = 0; _pingId = 0; @@ -2172,7 +2075,7 @@ void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) { } } -void ConnectionPrivate::resend( +void Connection::resend( mtpMsgId msgId, crl::time msCanWait, bool forceContainer) { @@ -2211,7 +2114,7 @@ void ConnectionPrivate::resend( } } -void ConnectionPrivate::resendAll() { +void Connection::resendAll() { auto toResend = std::vector(); auto lock = QReadLocker(_sessionData->haveSentMutex()); @@ -2230,7 +2133,7 @@ void ConnectionPrivate::resendAll() { _sessionData->queueSendAnything(); } -void ConnectionPrivate::onConnected( +void Connection::onConnected( not_null connection) { disconnect(connection, &AbstractConnection::connected, nullptr, nullptr); if (!connection->isConnected()) { @@ -2264,7 +2167,7 @@ void ConnectionPrivate::onConnected( } } -void ConnectionPrivate::onDisconnected( +void Connection::onDisconnected( not_null connection) { removeTestConnection(connection); @@ -2276,7 +2179,7 @@ void ConnectionPrivate::onDisconnected( } } -void ConnectionPrivate::confirmBestConnection() { +void Connection::confirmBestConnection() { if (_waitForBetterTimer.isActive()) { return; } @@ -2300,7 +2203,7 @@ void ConnectionPrivate::confirmBestConnection() { checkAuthKey(); } -void ConnectionPrivate::removeTestConnection( +void Connection::removeTestConnection( not_null connection) { _testConnections.erase( ranges::remove( @@ -2310,7 +2213,7 @@ void ConnectionPrivate::removeTestConnection( end(_testConnections)); } -void ConnectionPrivate::checkAuthKey() { +void Connection::checkAuthKey() { if (_keyId) { authKeyChecked(); } else if (_instance->isKeysDestroyer()) { @@ -2321,7 +2224,7 @@ void ConnectionPrivate::checkAuthKey() { } } -void ConnectionPrivate::updateAuthKey() { +void Connection::updateAuthKey() { if (_instance->isKeysDestroyer() || _keyCreator || !_connection) { return; } @@ -2339,7 +2242,7 @@ void ConnectionPrivate::updateAuthKey() { } } -void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) { +void Connection::setCurrentKeyId(uint64 newKeyId) { if (_keyId == newKeyId) { return; } @@ -2349,7 +2252,7 @@ void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) { changeSessionId(); } -void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) { +void Connection::applyAuthKey(AuthKeyPtr &&encryptionKey) { _encryptionKey = std::move(encryptionKey); const auto newKeyId = _encryptionKey ? _encryptionKey->keyId() : 0; if (_keyId) { @@ -2391,7 +2294,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) { } } -bool ConnectionPrivate::destroyOldEnoughPersistentKey() { +bool Connection::destroyOldEnoughPersistentKey() { Expects(_keyCreator != nullptr); const auto key = _keyCreator->bindPersistentKey(); @@ -2410,7 +2313,7 @@ bool ConnectionPrivate::destroyOldEnoughPersistentKey() { return true; } -DcType ConnectionPrivate::tryAcquireKeyCreation() { +DcType Connection::tryAcquireKeyCreation() { if (_keyCreator) { return _currentDcType; } else if (_instance->isKeysDestroyer()) { @@ -2484,7 +2387,7 @@ DcType ConnectionPrivate::tryAcquireKeyCreation() { return forceUseRegular ? DcType::Regular : _realDcType; } -void ConnectionPrivate::authKeyChecked() { +void Connection::authKeyChecked() { connect(_connection, &AbstractConnection::receivedData, [=] { handleReceived(); }); @@ -2497,7 +2400,7 @@ void ConnectionPrivate::authKeyChecked() { _sessionData->queueNeedToResumeAndSend(); } -void ConnectionPrivate::onError( +void Connection::onError( not_null connection, qint32 errorCode) { if (errorCode == -429) { @@ -2517,7 +2420,7 @@ void ConnectionPrivate::onError( } } -void ConnectionPrivate::handleError(int errorCode) { +void Connection::handleError(int errorCode) { destroyAllConnections(); _waitForConnectedTimer.cancel(); @@ -2529,7 +2432,7 @@ void ConnectionPrivate::handleError(int errorCode) { } } -void ConnectionPrivate::destroyTemporaryKey() { +void Connection::destroyTemporaryKey() { if (_instance->isKeysDestroyer()) { LOG(("MTP Info: -404 error received in destroyer %1, assuming key was destroyed.").arg(_shiftedDcId)); _instance->keyWasPossiblyDestroyed(_shiftedDcId); @@ -2544,7 +2447,7 @@ void ConnectionPrivate::destroyTemporaryKey() { restart(); } -bool ConnectionPrivate::sendSecureRequest( +bool Connection::sendSecureRequest( SerializedRequest &&request, bool needAnyResponse) { #ifdef TDESKTOP_MTPROTO_OLD @@ -2628,7 +2531,7 @@ bool ConnectionPrivate::sendSecureRequest( return true; } -mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { +mtpRequestId Connection::wasSent(mtpMsgId msgId) const { if (msgId == _pingMsgId || msgId == _bindMsgId) { return mtpRequestId(0xFFFFFFFF); } @@ -2651,13 +2554,13 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { return 0; } -void ConnectionPrivate::clearUnboundKeyCreator() { +void Connection::clearUnboundKeyCreator() { if (_keyCreator) { _keyCreator->stop(); } } -void ConnectionPrivate::releaseKeyCreationOnFail() { +void Connection::releaseKeyCreationOnFail() { if (!_keyCreator) { return; } @@ -2665,8 +2568,5 @@ void ConnectionPrivate::releaseKeyCreationOnFail() { _sessionData->releaseKeyCreationOnFail(); } -void ConnectionPrivate::stop() { -} - } // namespace internal } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index fddd75bd7..bd2384b52 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -29,72 +29,34 @@ class Instance; namespace internal { class AbstractConnection; -class ConnectionPrivate; class SessionData; class RSAPublicKey; struct ConnectionOptions; -class Connection { +class Connection : public QObject { public: - enum ConnectionType { - TcpConnection, - HttpConnection - }; - - Connection(not_null instance); - ~Connection(); - - void start(std::shared_ptr data, ShiftedDcId shiftedDcId); - - void kill(); - void waitTillFinish(); - - static const int UpdateAlways = 666; - - int32 state() const; - QString transport() const; - -private: - not_null _instance; - std::unique_ptr _thread; - ConnectionPrivate *_private = nullptr; - rpl::lifetime _lifetime; - -}; - -class ConnectionPrivate : public QObject { - Q_OBJECT - -public: - ConnectionPrivate( + Connection( not_null instance, not_null thread, - not_null owner, std::shared_ptr data, ShiftedDcId shiftedDcId); - ~ConnectionPrivate(); + ~Connection(); - void stop(); - - int32 getShiftedDcId() const; + [[nodiscard]] int32 getShiftedDcId() const; void dcOptionsChanged(); + void cdnConfigChanged(); - int32 getState() const; - QString transport() const; - -public slots: - void restartNow(); - - void onPingSendForce(); - - // Sessions signals, when we need to send something - void tryToSend(); + [[nodiscard]] int32 getState() const; + [[nodiscard]] QString transport() const; void updateAuthKey(); - - void onCDNConfigLoaded(); + void restartNow(); + void sendPingForce(); + void tryToSend(); private: + static constexpr auto kUpdateStateAlways = 666; + struct TestConnection { ConnectionPointer data; int priority = 0; @@ -113,7 +75,6 @@ private: void connectingTimedOut(); void doDisconnect(); void restart(); - void finishAndDestroy(); void requestCDNConfig(); void handleError(int errorCode); void onError( @@ -167,7 +128,7 @@ private: void handleMsgsStates(const QVector &ids, const QByteArray &states, QVector &acked); // _sessionDataMutex must be locked for read. - bool setState(int32 state, int32 ifState = Connection::UpdateAlways); + bool setState(int state, int ifState = kUpdateStateAlways); void appendTestConnection( DcOptions::Variants::Protocol protocol, @@ -206,13 +167,12 @@ private: [[nodiscard]] bool realDcTypeChanged(); const not_null _instance; - const not_null _owner; const ShiftedDcId _shiftedDcId = 0; DcType _realDcType = DcType(); DcType _currentDcType = DcType(); mutable QReadWriteLock _stateMutex; - int32 _state = DisconnectedState; + int _state = DisconnectedState; bool _needSessionReset = false; @@ -241,8 +201,6 @@ private: base::Timer _pingSender; base::Timer _checkSentRequestsTimer; - bool _finished = false; - std::shared_ptr _sessionData; std::unique_ptr _connectionOptions; AuthKeyPtr _encryptionKey; diff --git a/Telegram/SourceFiles/mtproto/dc_options.cpp b/Telegram/SourceFiles/mtproto/dc_options.cpp index cbac04e31..34dfd0cbe 100644 --- a/Telegram/SourceFiles/mtproto/dc_options.cpp +++ b/Telegram/SourceFiles/mtproto/dc_options.cpp @@ -68,6 +68,11 @@ public: : _that(that) , _lock(&_that->_useThroughLockers) { } + + void unlock() { + _lock.unlock(); + } + ~WriteLocker() { _that->computeCdnDcIds(); } @@ -84,6 +89,10 @@ public: : _lock(&that->_useThroughLockers) { } + void unlock() { + _lock.unlock(); + } + private: QReadLocker _lock; @@ -517,6 +526,10 @@ rpl::producer DcOptions::changed() const { return _changed.events(); } +rpl::producer<> DcOptions::cdnConfigChanged() const { + return _cdnConfigChanged.events(); +} + std::vector DcOptions::configEnumDcIds() const { auto result = std::vector(); { @@ -553,20 +566,23 @@ DcType DcOptions::dcType(ShiftedDcId shiftedDcId) const { void DcOptions::setCDNConfig(const MTPDcdnConfig &config) { WriteLocker lock(this); _cdnPublicKeys.clear(); - for_const (auto &publicKey, config.vpublic_keys().v) { - Expects(publicKey.type() == mtpc_cdnPublicKey); - const auto &keyData = publicKey.c_cdnPublicKey(); - const auto keyBytes = bytes::make_span(keyData.vpublic_key().v); - auto key = internal::RSAPublicKey(keyBytes); - if (key.valid()) { - _cdnPublicKeys[keyData.vdc_id().v].emplace( - key.fingerprint(), - std::move(key)); - } else { - LOG(("MTP Error: could not read this public RSA key:")); - LOG((qs(keyData.vpublic_key()))); - } + for (const auto &key : config.vpublic_keys().v) { + key.match([&](const MTPDcdnPublicKey &data) { + const auto keyBytes = bytes::make_span(data.vpublic_key().v); + auto key = internal::RSAPublicKey(keyBytes); + if (key.valid()) { + _cdnPublicKeys[data.vdc_id().v].emplace( + key.fingerprint(), + std::move(key)); + } else { + LOG(("MTP Error: could not read this public RSA key:")); + LOG((qs(data.vpublic_key()))); + } + }); } + lock.unlock(); + + _cdnConfigChanged.fire({}); } bool DcOptions::hasCDNKeysForDc(DcId dcId) const { diff --git a/Telegram/SourceFiles/mtproto/dc_options.h b/Telegram/SourceFiles/mtproto/dc_options.h index 4c21e5567..ca0293b9c 100644 --- a/Telegram/SourceFiles/mtproto/dc_options.h +++ b/Telegram/SourceFiles/mtproto/dc_options.h @@ -67,6 +67,7 @@ public: QByteArray serialize() const; [[nodiscard]] rpl::producer changed() const; + [[nodiscard]] rpl::producer<> cdnConfigChanged() const; void setFromList(const MTPVector &options); void addFromList(const MTPVector &options); void addFromOther(DcOptions &&options); @@ -141,6 +142,7 @@ private: mutable QReadWriteLock _useThroughLockers; rpl::event_stream _changed; + rpl::event_stream<> _cdnConfigChanged; // True when we have overriden options from a .tdesktop-endpoints file. bool _immutable = false; diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index 3f4a91e63..3fb10a63b 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -71,6 +71,7 @@ public: [[nodiscard]] rpl::producer dcTemporaryKeyChanged() const; [[nodiscard]] AuthKeysList getKeysForWrite() const; void addKeysForDestroy(AuthKeysList &&keys); + [[nodiscard]] rpl::producer<> allKeysDestroyed() const; [[nodiscard]] not_null dcOptions(); @@ -106,10 +107,6 @@ public: void removeDc(ShiftedDcId shiftedDcId); void unpaused(); - void queueQuittingConnection( - std::unique_ptr &&connection); - void connectionFinished(not_null connection); - void sendRequest( mtpRequestId requestId, SerializedRequest &&request, @@ -216,8 +213,6 @@ private: base::flat_map> _sessions; std::vector> _sessionsToDestroy; - std::vector> _connectionsToDestroy; - std::unique_ptr _configLoader; std::unique_ptr _domainResolver; std::unique_ptr _httpUnixtimeLoader; @@ -229,6 +224,8 @@ private: base::flat_map _keysForWrite; base::flat_map _logoutGuestRequestIds; + rpl::event_stream<> _allKeysDestroyed; + // holds dcWithShift for request to this dc or -dc for request to main dc std::map _requestsByDc; mutable QMutex _requestByDcLock; @@ -480,13 +477,10 @@ void Instance::Private::requestCDNConfig() { MTPhelp_GetCdnConfig() ).done([this](const MTPCdnConfig &result) { _cdnConfigLoadRequestId = 0; - - Expects(result.type() == mtpc_cdnConfig); - dcOptions()->setCDNConfig(result.c_cdnConfig()); - + result.match([&](const MTPDcdnConfig &data) { + dcOptions()->setCDNConfig(data); + }); Local::writeSettings(); - - emit _instance->cdnConfigLoaded(); }).send(); } @@ -759,6 +753,10 @@ void Instance::Private::addKeysForDestroy(AuthKeysList &&keys) { } } +rpl::producer<> Instance::Private::allKeysDestroyed() const { + return _allKeysDestroyed.events(); +} + not_null Instance::Private::dcOptions() { return _dcOptions; } @@ -777,22 +775,6 @@ void Instance::Private::unpaused() { } } -void Instance::Private::queueQuittingConnection( - std::unique_ptr &&connection) { - _connectionsToDestroy.push_back(std::move(connection)); -} - -void Instance::Private::connectionFinished( - not_null connection) { - const auto i = ranges::find( - _connectionsToDestroy, - connection.get(), - &std::unique_ptr::get); - if (i != _connectionsToDestroy.end()) { - _connectionsToDestroy.erase(i); - } -} - void Instance::Private::configLoadDone(const MTPConfig &result) { Expects(result.type() == mtpc_config); @@ -1582,7 +1564,7 @@ void Instance::Private::completedKeyDestroy(ShiftedDcId shiftedDcId) { _keysForWrite.erase(shiftedDcId); killSession(shiftedDcId); if (_dcenters.empty()) { - emit _instance->allKeysDestroyed(); + _allKeysDestroyed.fire({}); } } @@ -1674,6 +1656,10 @@ QString Instance::langPackName() const { return Lang::Current().langPackName(); } +rpl::producer<> Instance::allKeysDestroyed() const { + return _private->allKeysDestroyed(); +} + void Instance::requestConfig() { _private->requestConfig(); } @@ -1698,10 +1684,6 @@ void Instance::requestCDNConfig() { _private->requestCDNConfig(); } -void Instance::connectionFinished(not_null connection) { - _private->connectionFinished(connection); -} - void Instance::restart() { _private->restart(); } @@ -1784,11 +1766,6 @@ void Instance::unpaused() { _private->unpaused(); } -void Instance::queueQuittingConnection( - std::unique_ptr &&connection) { - _private->queueQuittingConnection(std::move(connection)); -} - void Instance::setUpdatesHandler(RPCDoneHandlerPtr onDone) { _private->setUpdatesHandler(onDone); } diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index f28f635cc..2eb80fb8d 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -15,7 +15,6 @@ namespace internal { class Dcenter; class Session; -class Connection; [[nodiscard]] int GetNextRequestId(); @@ -61,6 +60,8 @@ public: [[nodiscard]] QString cloudLangCode() const; [[nodiscard]] QString langPackName() const; + [[nodiscard]] rpl::producer<> allKeysDestroyed() const; + // Thread-safe. [[nodiscard]] QString deviceModel() const; [[nodiscard]] QString systemVersion() const; @@ -90,8 +91,6 @@ public: void unpaused(); - void queueQuittingConnection(std::unique_ptr &&connection); - void setUpdatesHandler(RPCDoneHandlerPtr onDone); void setGlobalFailHandler(RPCFailHandlerPtr onFail); void setStateChangedHandler(Fn handler); @@ -126,8 +125,6 @@ public: void syncHttpUnixtime(); - void connectionFinished(not_null connection); - void sendAnything(ShiftedDcId shiftedDcId = 0, crl::time msCanWait = 0); template @@ -199,8 +196,6 @@ public: } signals: - void cdnConfigLoaded(); - void allKeysDestroyed(); void proxyDomainResolved( QString host, QStringList ips, diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 66cf7f31e..ec9e0d58a 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -148,8 +148,7 @@ Session::Session( not_null instance, ShiftedDcId shiftedDcId, not_null dc) -: QObject() -, _instance(instance) +: _instance(instance) , _shiftedDcId(shiftedDcId) , _dc(dc) , _data(std::make_shared(this)) @@ -157,6 +156,19 @@ Session::Session( _timeouter.callEach(1000); refreshOptions(); watchDcKeyChanges(); + watchDcOptionsChanges(); +} + +Session::~Session() { + Expects(!_connection); + Expects(!_thread); + + if (_myKeyCreation != CreatingKeyType::None) { + releaseKeyCreationOnFail(); + } + for (const auto &thread : _destroyingThreads) { + thread->wait(); + } } void Session::watchDcKeyChanges() { @@ -166,13 +178,59 @@ void Session::watchDcKeyChanges() { }) | rpl::start_with_next([=] { DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, " "emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId)); - emit authKeyChanged(); + if (const auto connection = _connection) { + InvokeQueued(connection, [=] { + connection->updateAuthKey(); + }); + } }, _lifetime); } +void Session::watchDcOptionsChanges() { + _instance->dcOptions()->changed( + ) | rpl::filter([=](DcId dcId) { + return (BareDcId(_shiftedDcId) == dcId) && (_connection != nullptr); + }) | rpl::start_with_next([=] { + InvokeQueued(_connection, [connection = _connection] { + connection->dcOptionsChanged(); + }); + }, _lifetime); + + if (_instance->dcOptions()->dcType(_shiftedDcId) == DcType::Cdn) { + _instance->dcOptions()->cdnConfigChanged( + ) | rpl::filter([=] { + return (_connection != nullptr); + }) | rpl::start_with_next([=] { + InvokeQueued(_connection, [connection = _connection] { + connection->cdnConfigChanged(); + }); + }, _lifetime); + } +} + void Session::start() { - _connection = std::make_unique(_instance); - _connection->start(_data, _shiftedDcId); + killConnection(); + + _thread = std::make_unique(); + const auto thread = _thread.get(); + + connect(thread, &QThread::finished, [=] { + InvokeQueued(this, [=] { + const auto i = ranges::find( + _destroyingThreads, + thread, + &std::unique_ptr::get); + if (i != _destroyingThreads.end()) { + _destroyingThreads.erase(i); + } + }); + }); + _connection = new Connection( + _instance, + thread, + _data, + _shiftedDcId); + thread->start(); } bool Session::rpcErrorOccured( @@ -188,7 +246,11 @@ void Session::restart() { return; } refreshOptions(); - emit needToRestart(); + if (const auto connection = _connection) { + InvokeQueued(connection, [=] { + connection->restartNow(); + }); + } } void Session::refreshOptions() { @@ -221,14 +283,11 @@ void Session::reInitConnection() { void Session::stop() { if (_killed) { - DEBUG_LOG(("Session Error: can't kill a killed session")); + DEBUG_LOG(("Session Error: can't stop a killed session")); return; } DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(_shiftedDcId)); - if (_connection) { - _connection->kill(); - _instance->queueQuittingConnection(std::move(_connection)); - } + killConnection(); } void Session::kill() { @@ -286,12 +345,15 @@ void Session::needToResumeAndSend() { DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(_shiftedDcId)); start(); } - if (_ping) { - _ping = false; - emit needToPing(); - } else { - emit needToSend(); - } + const auto connection = _connection; + const auto ping = base::take(_ping); + InvokeQueued(connection, [=] { + if (ping) { + connection->sendPingForce(); + } else { + connection->tryToSend(); + } + }); } void Session::connectionStateChange(int newState) { @@ -323,7 +385,7 @@ int32 Session::requestState(mtpRequestId requestId) const { bool connected = false; if (_connection) { - int32 s = _connection->state(); + const auto s = _connection->getState(); if (s == ConnectedState) { connected = true; } else if (s == ConnectingState || s == DisconnectedState) { @@ -352,7 +414,7 @@ int32 Session::getState() const { int32 result = -86400000; if (_connection) { - int32 s = _connection->state(); + const auto s = _connection->getState(); if (s == ConnectedState) { return s; } else if (s == ConnectingState || s == DisconnectedState) { @@ -448,7 +510,8 @@ void Session::releaseKeyCreationOnFail() { } void Session::notifyDcConnectionInited() { - DEBUG_LOG(("MTP Info: emitting MTProtoDC::connectionWasInited(), dcWithShift %1").arg(_shiftedDcId)); + DEBUG_LOG(("MTP Info: MTProtoDC::connectionWasInited(), dcWithShift %1" + ).arg(_shiftedDcId)); _dc->setConnectionInited(); } @@ -514,11 +577,19 @@ void Session::tryToReceive() { } } -Session::~Session() { - if (_myKeyCreation != CreatingKeyType::None) { - releaseKeyCreationOnFail(); +void Session::killConnection() { + Expects(!_thread || _connection); + + if (!_connection) { + return; } - Assert(_connection == nullptr); + + base::take(_connection)->deleteLater(); + _destroyingThreads.push_back(base::take(_thread)); + _destroyingThreads.back()->quit(); + + Ensures(_connection == nullptr); + Ensures(_thread == nullptr); } } // namespace internal diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index ce4ef83a3..6a9ced818 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -56,7 +56,7 @@ struct ConnectionOptions { class Session; class SessionData { public: - SessionData(not_null creator) : _owner(creator) { + explicit SessionData(not_null creator) : _owner(creator) { } void notifyConnectionInited(const ConnectionOptions &options); @@ -92,11 +92,6 @@ public: return _receivedUpdates; } - // Warning! Valid only in constructor, _owner is guaranteed != null. - [[nodiscard]] not_null owner() { - return _owner; - } - // Connection -> Session interface. void queueTryToReceive(); void queueNeedToResumeAndSend(); @@ -139,8 +134,6 @@ private: }; class Session : public QObject { - Q_OBJECT - public: // Main thread. Session( @@ -180,8 +173,8 @@ public: void ping(); void cancel(mtpRequestId requestId, mtpMsgId msgId); - int32 requestState(mtpRequestId requestId) const; - int32 getState() const; + int requestState(mtpRequestId requestId) const; + int getState() const; QString transport() const; void tryToReceive(); @@ -190,23 +183,26 @@ public: void resetDone(); void sendAnything(crl::time msCanWait = 0); -signals: - void authKeyChanged(); - void needToSend(); - void needToPing(); - void needToRestart(); - private: void watchDcKeyChanges(); + void watchDcOptionsChanges(); - bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); + void killConnection(); + + bool rpcErrorOccured( + mtpRequestId requestId, + const RPCFailHandlerPtr &onFail, + const RPCError &err); const not_null _instance; const ShiftedDcId _shiftedDcId = 0; const not_null _dc; const std::shared_ptr _data; - std::unique_ptr _connection; + std::unique_ptr _thread; + std::vector> _destroyingThreads; + + Connection *_connection = nullptr; bool _killed = false; bool _needToReceive = false;