Resend non-authorization requests on main DC change.

This commit is contained in:
John Preston 2021-08-30 20:32:30 +03:00
parent ae40ea9336
commit 2d0bcf7dca
7 changed files with 56 additions and 39 deletions

View file

@ -153,15 +153,11 @@ Manager::Manager(not_null<Main::Domain*> domain)
if (!account) { if (!account) {
_api.reset(); _api.reset();
} }
return rpl::combine( return account
account ? account->mtpMainSessionValue()
? account->appConfig().refreshed() | rpl::map_to(true) : rpl::never<not_null<MTP::Instance*>>();
: rpl::never<>() | rpl::map_to(false),
account
? account->mtpValue()
: rpl::never<not_null<MTP::Instance*>>());
}) | rpl::flatten_latest( }) | rpl::flatten_latest(
) | rpl::start_with_next([=](bool, not_null<MTP::Instance*> instance) { ) | rpl::start_with_next([=](not_null<MTP::Instance*> instance) {
_api.emplace(instance); _api.emplace(instance);
request(); request();
}, _lifetime); }, _lifetime);
@ -193,7 +189,7 @@ void Manager::write() const {
} }
void Manager::request() { void Manager::request() {
Expects(_api); Expects(_api.has_value());
const auto convertMTP = [](const auto &vector, bool force = false) { const auto convertMTP = [](const auto &vector, bool force = false) {
if (!vector) { if (!vector) {
@ -202,7 +198,7 @@ void Manager::request() {
return ranges::views::all( return ranges::views::all(
vector->v vector->v
) | ranges::views::transform([](const MTPstring &s) -> QString { ) | ranges::views::transform([](const MTPstring &s) -> QString {
return s.v; return qs(s);
}) | ranges::to_vector; }) | ranges::to_vector;
}; };

View file

@ -164,7 +164,7 @@ CloudManager::CloudManager(Instance &langpack)
_api.reset(); _api.reset();
} }
return account return account
? account->mtpValue() ? account->mtpMainSessionValue()
: rpl::never<not_null<MTP::Instance*>>(); : rpl::never<not_null<MTP::Instance*>>();
}) | rpl::flatten_latest( }) | rpl::flatten_latest(
) | rpl::start_with_next([=](not_null<MTP::Instance*> instance) { ) | rpl::start_with_next([=](not_null<MTP::Instance*> instance) {

View file

@ -241,6 +241,12 @@ rpl::producer<not_null<MTP::Instance*>> Account::mtpValue() const {
}); });
} }
rpl::producer<not_null<MTP::Instance*>> Account::mtpMainSessionValue() const {
return mtpValue() | rpl::map([=](not_null<MTP::Instance*> instance) {
return instance->mainDcIdValue() | rpl::map_to(instance);
}) | rpl::flatten_latest();
}
rpl::producer<MTPUpdates> Account::mtpUpdates() const { rpl::producer<MTPUpdates> Account::mtpUpdates() const {
return _mtpUpdates.events(); return _mtpUpdates.events();
} }

View file

@ -83,6 +83,13 @@ public:
} }
[[nodiscard]] rpl::producer<not_null<MTP::Instance*>> mtpValue() const; [[nodiscard]] rpl::producer<not_null<MTP::Instance*>> mtpValue() const;
// Each time the main session changes a new copy of the pointer is fired.
// This allows to resend the requests that were not requiring auth, and
// which could be forgotten without calling .done() or .fail() because
// of the main dc changing.
[[nodiscard]] auto mtpMainSessionValue() const
-> rpl::producer<not_null<MTP::Instance*>>;
// Set from legacy storage. // Set from legacy storage.
void setLegacyMtpKey(std::shared_ptr<MTP::AuthKey> key); void setLegacyMtpKey(std::shared_ptr<MTP::AuthKey> key);

View file

@ -19,7 +19,7 @@ constexpr auto kRefreshTimeout = 3600 * crl::time(1000);
} // namespace } // namespace
AppConfig::AppConfig(not_null<Account*> account) : _account(account) { AppConfig::AppConfig(not_null<Account*> account) : _account(account) {
account->mtpValue( account->mtpMainSessionValue(
) | rpl::start_with_next([=](not_null<MTP::Instance*> instance) { ) | rpl::start_with_next([=](not_null<MTP::Instance*> instance) {
_api.emplace(instance); _api.emplace(instance);
refresh(); refresh();

View file

@ -72,6 +72,7 @@ public:
void suggestMainDcId(DcId mainDcId); void suggestMainDcId(DcId mainDcId);
void setMainDcId(DcId mainDcId); void setMainDcId(DcId mainDcId);
[[nodiscard]] DcId mainDcId() const; [[nodiscard]] DcId mainDcId() const;
[[nodiscard]] rpl::producer<DcId> mainDcIdValue() const;
[[nodiscard]] rpl::producer<> writeKeysRequests() const; [[nodiscard]] rpl::producer<> writeKeysRequests() const;
@ -191,7 +192,7 @@ private:
Session *findSession(ShiftedDcId shiftedDcId); Session *findSession(ShiftedDcId shiftedDcId);
not_null<Session*> startSession(ShiftedDcId shiftedDcId); not_null<Session*> startSession(ShiftedDcId shiftedDcId);
Session *removeSession(ShiftedDcId shiftedDcId); void scheduleSessionDestroy(ShiftedDcId shiftedDcId);
[[nodiscard]] not_null<QThread*> getThreadForDc(ShiftedDcId shiftedDcId); [[nodiscard]] not_null<QThread*> getThreadForDc(ShiftedDcId shiftedDcId);
void applyDomainIps( void applyDomainIps(
@ -225,7 +226,7 @@ private:
QString _deviceModel; QString _deviceModel;
QString _systemVersion; QString _systemVersion;
DcId _mainDcId = Fields::kDefaultMainDc; rpl::variable<DcId> _mainDcId = Fields::kDefaultMainDc;
bool _mainDcIdForced = false; bool _mainDcIdForced = false;
base::flat_map<DcId, std::unique_ptr<Dcenter>> _dcenters; base::flat_map<DcId, std::unique_ptr<Dcenter>> _dcenters;
std::vector<std::unique_ptr<Dcenter>> _dcentersToDestroy; std::vector<std::unique_ptr<Dcenter>> _dcentersToDestroy;
@ -356,13 +357,13 @@ void Instance::Private::start() {
for (const auto &[shiftedDcId, dc] : _dcenters) { for (const auto &[shiftedDcId, dc] : _dcenters) {
startSession(shiftedDcId); startSession(shiftedDcId);
} }
} else if (_mainDcId != Fields::kNoneMainDc) { } else if (mainDcId() != Fields::kNoneMainDc) {
_mainSession = startSession(_mainDcId); _mainSession = startSession(mainDcId());
} }
_checkDelayedTimer.setCallback([this] { checkDelayedRequests(); }); _checkDelayedTimer.setCallback([this] { checkDelayedRequests(); });
Assert((_mainDcId == Fields::kNoneMainDc) == isKeysDestroyer()); Assert((mainDcId() == Fields::kNoneMainDc) == isKeysDestroyer());
requestConfig(); requestConfig();
} }
@ -463,18 +464,26 @@ void Instance::Private::setMainDcId(DcId mainDcId) {
} }
_mainDcIdForced = true; _mainDcIdForced = true;
auto oldMainDcId = _mainSession->getDcWithShift(); const auto oldMainDcId = _mainSession->getDcWithShift();
_mainDcId = mainDcId; if (oldMainDcId != mainDcId) {
if (oldMainDcId != _mainDcId) { scheduleSessionDestroy(oldMainDcId);
killSession(oldMainDcId); scheduleSessionDestroy(mainDcId);
_mainSession = startSession(mainDcId);
} }
_mainDcId = mainDcId;
_writeKeysRequests.fire({}); _writeKeysRequests.fire({});
} }
DcId Instance::Private::mainDcId() const { DcId Instance::Private::mainDcId() const {
Expects(_mainDcId != Fields::kNoneMainDc); Expects(_mainDcId.current() != Fields::kNoneMainDc);
return _mainDcId; return _mainDcId.current();
}
rpl::producer<DcId> Instance::Private::mainDcIdValue() const {
Expects(_mainDcId.current() != Fields::kNoneMainDc);
return _mainDcId.value();
} }
void Instance::Private::requestConfig() { void Instance::Private::requestConfig() {
@ -548,7 +557,7 @@ void Instance::Private::requestConfigIfExpired() {
} }
void Instance::Private::requestCDNConfig() { void Instance::Private::requestCDNConfig() {
if (_cdnConfigLoadRequestId || _mainDcId == Fields::kNoneMainDc) { if (_cdnConfigLoadRequestId || mainDcId() == Fields::kNoneMainDc) {
return; return;
} }
_cdnConfigLoadRequestId = request( _cdnConfigLoadRequestId = request(
@ -652,19 +661,11 @@ int32 Instance::Private::state(mtpRequestId requestId) {
} }
void Instance::Private::killSession(ShiftedDcId shiftedDcId) { void Instance::Private::killSession(ShiftedDcId shiftedDcId) {
const auto checkIfMainAndKill = [&](ShiftedDcId shiftedDcId) { const auto i = _sessions.find(shiftedDcId);
if (const auto removed = removeSession(shiftedDcId)) { if (i != _sessions.cend()) {
return (removed == _mainSession); Assert(i->second.get() != _mainSession);
}
return false;
};
if (checkIfMainAndKill(shiftedDcId)) {
checkIfMainAndKill(_mainDcId);
_mainSession = startSession(_mainDcId);
} }
InvokeQueued(_instance, [=] { scheduleSessionDestroy(shiftedDcId);
_sessionsToDestroy.clear();
});
} }
void Instance::Private::stopSession(ShiftedDcId shiftedDcId) { void Instance::Private::stopSession(ShiftedDcId shiftedDcId) {
@ -1563,15 +1564,17 @@ not_null<Session*> Instance::Private::startSession(ShiftedDcId shiftedDcId) {
return result; return result;
} }
Session *Instance::Private::removeSession(ShiftedDcId shiftedDcId) { void Instance::Private::scheduleSessionDestroy(ShiftedDcId shiftedDcId) {
const auto i = _sessions.find(shiftedDcId); const auto i = _sessions.find(shiftedDcId);
if (i == _sessions.cend()) { if (i == _sessions.cend()) {
return nullptr; return;
} }
i->second->kill(); i->second->kill();
_sessionsToDestroy.push_back(std::move(i->second)); _sessionsToDestroy.push_back(std::move(i->second));
_sessions.erase(i); _sessions.erase(i);
return _sessionsToDestroy.back().get(); InvokeQueued(_instance, [=] {
_sessionsToDestroy.clear();
});
} }
@ -1790,6 +1793,10 @@ DcId Instance::mainDcId() const {
return _private->mainDcId(); return _private->mainDcId();
} }
rpl::producer<DcId> Instance::mainDcIdValue() const {
return _private->mainDcIdValue();
}
QString Instance::systemLangCode() const { QString Instance::systemLangCode() const {
return Lang::GetInstance().systemLangCode(); return Lang::GetInstance().systemLangCode();
} }

View file

@ -65,6 +65,7 @@ public:
void suggestMainDcId(DcId mainDcId); void suggestMainDcId(DcId mainDcId);
void setMainDcId(DcId mainDcId); void setMainDcId(DcId mainDcId);
[[nodiscard]] DcId mainDcId() const; [[nodiscard]] DcId mainDcId() const;
[[nodiscard]] rpl::producer<DcId> mainDcIdValue() const;
[[nodiscard]] QString systemLangCode() const; [[nodiscard]] QString systemLangCode() const;
[[nodiscard]] QString cloudLangCode() const; [[nodiscard]] QString cloudLangCode() const;
[[nodiscard]] QString langPackName() const; [[nodiscard]] QString langPackName() const;