diff --git a/Telegram/SourceFiles/api/api_statistics.cpp b/Telegram/SourceFiles/api/api_statistics.cpp index 12c7a313e..09b5c5c06 100644 --- a/Telegram/SourceFiles/api/api_statistics.cpp +++ b/Telegram/SourceFiles/api/api_statistics.cpp @@ -18,6 +18,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Api { namespace { +constexpr auto kCheckRequestsTimer = 10 * crl::time(1000); + [[nodiscard]] Data::StatisticalGraph StatisticalGraphFromTL( const MTPStatsGraph &tl) { return tl.match([&](const MTPDstatsGraph &d) { @@ -188,36 +190,91 @@ namespace { } // namespace Statistics::Statistics(not_null channel) +: StatisticsRequestSender(channel) { +} + +StatisticsRequestSender::StatisticsRequestSender(not_null channel) : _channel(channel) -, _api(&channel->session().api().instance()) { +, _api(&_channel->session().api().instance()) +, _timer([=] { checkRequests(); }) { +} + +StatisticsRequestSender::~StatisticsRequestSender() { + for (const auto &[dcId, ids] : _requests) { + for (const auto id : ids) { + _channel->session().api().unregisterStatsRequest(dcId, id); + } + } +} + +void StatisticsRequestSender::checkRequests() { + const auto api = &_channel->session().api(); + for (auto i = begin(_requests); i != end(_requests);) { + for (auto j = begin(i->second); j != end(i->second);) { + if (_api.pending(*j)) { + ++j; + } else { + _channel->session().api().unregisterStatsRequest( + i->first, + *j); + j = i->second.erase(j); + } + } + if (i->second.empty()) { + i = _requests.erase(i); + } else { + ++i; + } + } + if (_requests.empty()) { + _timer.cancel(); + } +} + +template +auto StatisticsRequestSender::makeRequest(Request &&request) { + const auto id = _api.allocateRequestId(); + const auto dcId = _channel->owner().statsDcId(_channel); + if (dcId) { + _channel->session().api().registerStatsRequest(dcId, id); + _requests[dcId].emplace(id); + if (!_timer.isActive()) { + _timer.callEach(kCheckRequestsTimer); + } + } + return std::move(_api.request( + std::forward(request) + ).toDC( + dcId ? MTP::ShiftDcId(dcId, MTP::kStatsDcShift) : 0 + ).overrideId(id)); } rpl::producer Statistics::request() { return [=](auto consumer) { auto lifetime = rpl::lifetime(); - const auto dcId = _channel->owner().statsDcId(_channel); - if (!_channel->isMegagroup()) { - _api.request(MTPstats_GetBroadcastStats( + if (!channel()->isMegagroup()) { + makeRequest(MTPstats_GetBroadcastStats( MTP_flags(MTPstats_GetBroadcastStats::Flags(0)), - _channel->inputChannel + channel()->inputChannel )).done([=](const MTPstats_BroadcastStats &result) { _channelStats = ChannelStatisticsFromTL(result.data()); consumer.put_done(); }).fail([=](const MTP::Error &error) { consumer.put_error_copy(error.type()); - }).toDC(MTP::ShiftDcId(dcId, MTP::kStatsDcShift)).send(); + }).send(); } else { - _api.request(MTPstats_GetMegagroupStats( + makeRequest(MTPstats_GetMegagroupStats( MTP_flags(MTPstats_GetMegagroupStats::Flags(0)), - _channel->inputChannel + channel()->inputChannel )).done([=](const MTPstats_MegagroupStats &result) { - _supergroupStats = SupergroupStatisticsFromTL(result.data()); - _channel->owner().processUsers(result.data().vusers()); + const auto &data = result.data(); + _supergroupStats = SupergroupStatisticsFromTL(data); + channel()->owner().processUsers(data.vusers()); consumer.put_done(); }).fail([=](const MTP::Error &error) { consumer.put_error_copy(error.type()); - }).toDC(MTP::ShiftDcId(dcId, MTP::kStatsDcShift)).send(); + }).send(); } return lifetime; @@ -229,10 +286,9 @@ Statistics::GraphResult Statistics::requestZoom( float64 x) { return [=](auto consumer) { auto lifetime = rpl::lifetime(); - const auto dcId = _channel->owner().statsDcId(_channel); const auto wasEmpty = _zoomDeque.empty(); _zoomDeque.push_back([=] { - _api.request(MTPstats_LoadAsyncGraph( + makeRequest(MTPstats_LoadAsyncGraph( MTP_flags(x ? MTPstats_LoadAsyncGraph::Flag::f_x : MTPstats_LoadAsyncGraph::Flag(0)), @@ -249,7 +305,7 @@ Statistics::GraphResult Statistics::requestZoom( } }).fail([=](const MTP::Error &error) { consumer.put_error_copy(error.type()); - }).toDC(MTP::ShiftDcId(dcId, MTP::kStatsDcShift)).send(); + }).send(); }); if (wasEmpty) { _zoomDeque.front()(); @@ -270,9 +326,8 @@ Data::SupergroupStatistics Statistics::supergroupStats() const { PublicForwards::PublicForwards( not_null channel, FullMsgId fullId) -: _channel(channel) -, _fullId(fullId) -, _api(&channel->session().api().instance()) { +: StatisticsRequestSender(channel) +, _fullId(fullId) { } void PublicForwards::request( @@ -281,20 +336,19 @@ void PublicForwards::request( if (_requestId) { return; } - const auto dcId = _channel->owner().statsDcId(_channel); - const auto offsetPeer = _channel->owner().peer(token.fullId.peer); + const auto offsetPeer = channel()->owner().peer(token.fullId.peer); const auto tlOffsetPeer = offsetPeer ? offsetPeer->input : MTP_inputPeerEmpty(); constexpr auto kLimit = tl::make_int(100); - _requestId = _api.request(MTPstats_GetMessagePublicForwards( - _channel->inputChannel, + _requestId = makeRequest(MTPstats_GetMessagePublicForwards( + channel()->inputChannel, MTP_int(_fullId.msg), MTP_int(token.rate), tlOffsetPeer, MTP_int(token.fullId.msg), kLimit - )).done([=, channel = _channel](const MTPmessages_Messages &result) { + )).done([=, channel = channel()](const MTPmessages_Messages &result) { using Messages = QVector; _requestId = 0; @@ -364,16 +418,15 @@ void PublicForwards::request( }); }).fail([=] { _requestId = 0; - }).toDC(MTP::ShiftDcId(dcId, MTP::kStatsDcShift)).send(); + }).send(); } MessageStatistics::MessageStatistics( not_null channel, FullMsgId fullId) -: _publicForwards(channel, fullId) -, _channel(channel) -, _fullId(fullId) -, _api(&channel->session().api().instance()) { +: StatisticsRequestSender(channel) +, _publicForwards(channel, fullId) +, _fullId(fullId) { } Data::PublicForwardsSlice MessageStatistics::firstSlice() const { @@ -381,11 +434,9 @@ Data::PublicForwardsSlice MessageStatistics::firstSlice() const { } void MessageStatistics::request(Fn done) { - if (_channel->isMegagroup()) { + if (channel()->isMegagroup()) { return; } - const auto dcId = _channel->owner().statsDcId(_channel); - const auto requestFirstPublicForwards = [=]( const Data::StatisticalGraph &messageGraph, const Data::StatisticsMessageInteractionInfo &info) { @@ -403,8 +454,8 @@ void MessageStatistics::request(Fn done) { const auto requestPrivateForwards = [=]( const Data::StatisticalGraph &messageGraph) { - _api.request(MTPchannels_GetMessages( - _channel->inputChannel, + api().request(MTPchannels_GetMessages( + channel()->inputChannel, MTP_vector( 1, MTP_inputMessageID(MTP_int(_fullId.msg)))) @@ -444,17 +495,16 @@ void MessageStatistics::request(Fn done) { }).send(); }; - _api.request(MTPstats_GetMessageStats( + makeRequest(MTPstats_GetMessageStats( MTP_flags(MTPstats_GetMessageStats::Flags(0)), - _channel->inputChannel, + channel()->inputChannel, MTP_int(_fullId.msg.bare) )).done([=](const MTPstats_MessageStats &result) { requestPrivateForwards( StatisticalGraphFromTL(result.data().vviews_graph())); }).fail([=](const MTP::Error &error) { requestPrivateForwards({}); - }).toDC(MTP::ShiftDcId(dcId, MTP::kStatsDcShift)).send(); - + }).send(); } Boosts::Boosts(not_null peer) diff --git a/Telegram/SourceFiles/api/api_statistics.h b/Telegram/SourceFiles/api/api_statistics.h index 14ec1ccf8..dddf3654c 100644 --- a/Telegram/SourceFiles/api/api_statistics.h +++ b/Telegram/SourceFiles/api/api_statistics.h @@ -7,6 +7,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once +#include "base/timer.h" #include "data/data_boosts.h" #include "data/data_statistics.h" #include "mtproto/sender.h" @@ -16,7 +17,35 @@ class PeerData; namespace Api { -class Statistics final { +class StatisticsRequestSender { +protected: + explicit StatisticsRequestSender(not_null channel); + ~StatisticsRequestSender(); + + template < + typename Request, + typename = std::enable_if_t>, + typename = typename Request::Unboxed> + [[nodiscard]] auto makeRequest(Request &&request); + + [[nodiscard]] MTP::Sender &api() { + return _api; + } + [[nodiscard]] not_null channel() { + return _channel; + } + +private: + void checkRequests(); + + const not_null _channel; + MTP::Sender _api; + base::Timer _timer; + base::flat_map> _requests; + +}; + +class Statistics final : public StatisticsRequestSender { public: explicit Statistics(not_null channel); @@ -30,34 +59,29 @@ public: [[nodiscard]] Data::SupergroupStatistics supergroupStats() const; private: - const not_null _channel; Data::ChannelStatistics _channelStats; Data::SupergroupStatistics _supergroupStats; - MTP::Sender _api; std::deque> _zoomDeque; }; -class PublicForwards final { +class PublicForwards final : public StatisticsRequestSender { public: - explicit PublicForwards(not_null channel, FullMsgId fullId); + PublicForwards(not_null channel, FullMsgId fullId); void request( const Data::PublicForwardsSlice::OffsetToken &token, Fn done); private: - const not_null _channel; const FullMsgId _fullId; mtpRequestId _requestId = 0; int _lastTotal = 0; - MTP::Sender _api; - }; -class MessageStatistics final { +class MessageStatistics final : public StatisticsRequestSender { public: explicit MessageStatistics( not_null channel, @@ -69,13 +93,11 @@ public: private: PublicForwards _publicForwards; - const not_null _channel; const FullMsgId _fullId; Data::PublicForwardsSlice _firstSlice; mtpRequestId _requestId = 0; - MTP::Sender _api; }; diff --git a/Telegram/SourceFiles/apiwrap.cpp b/Telegram/SourceFiles/apiwrap.cpp index c3024e991..ea150bf9b 100644 --- a/Telegram/SourceFiles/apiwrap.cpp +++ b/Telegram/SourceFiles/apiwrap.cpp @@ -114,6 +114,7 @@ constexpr auto kStickersByEmojiInvalidateTimeout = crl::time(6 * 1000); constexpr auto kNotifySettingSaveTimeout = crl::time(1000); constexpr auto kDialogsFirstLoad = 20; constexpr auto kDialogsPerPage = 500; +constexpr auto kStatsSessionKillTimeout = 10 * crl::time(1000); using PhotoFileLocationId = Data::PhotoFileLocationId; using DocumentFileLocationId = Data::DocumentFileLocationId; @@ -159,6 +160,7 @@ ApiWrap::ApiWrap(not_null session) , _fileLoader(std::make_unique(kFileLoaderQueueStopTimeout)) , _topPromotionTimer([=] { refreshTopPromotion(); }) , _updateNotifyTimer([=] { sendNotifySettingsUpdates(); }) +, _statsSessionKillTimer([=] { checkStatsSessions(); }) , _authorizations(std::make_unique(this)) , _attachedStickers(std::make_unique(this)) , _blockedPeers(std::make_unique(this)) @@ -4287,6 +4289,32 @@ void ApiWrap::saveSelfBio(const QString &text) { }).send(); } +void ApiWrap::registerStatsRequest(MTP::DcId dcId, mtpRequestId id) { + _statsRequests[dcId].emplace(id); +} + +void ApiWrap::unregisterStatsRequest(MTP::DcId dcId, mtpRequestId id) { + const auto i = _statsRequests.find(dcId); + Assert(i != end(_statsRequests)); + const auto removed = i->second.remove(id); + Assert(removed); + if (i->second.empty()) { + _statsSessionKillTimer.callOnce(kStatsSessionKillTimeout); + } +} + +void ApiWrap::checkStatsSessions() { + for (auto i = begin(_statsRequests); i != end(_statsRequests);) { + if (i->second.empty()) { + instance().killSession( + MTP::ShiftDcId(i->first, MTP::kStatsDcShift)); + i = _statsRequests.erase(i); + } else { + ++i; + } + } +} + Api::Authorizations &ApiWrap::authorizations() { return *_authorizations; } diff --git a/Telegram/SourceFiles/apiwrap.h b/Telegram/SourceFiles/apiwrap.h index f61eb0957..ecb0f968a 100644 --- a/Telegram/SourceFiles/apiwrap.h +++ b/Telegram/SourceFiles/apiwrap.h @@ -369,6 +369,9 @@ public: void saveSelfBio(const QString &text); + void registerStatsRequest(MTP::DcId dcId, mtpRequestId id); + void unregisterStatsRequest(MTP::DcId dcId, mtpRequestId id); + [[nodiscard]] Api::Authorizations &authorizations(); [[nodiscard]] Api::AttachedStickers &attachedStickers(); [[nodiscard]] Api::BlockedPeers &blockedPeers(); @@ -547,6 +550,8 @@ private: not_null channel); void migrateFail(not_null peer, const QString &error); + void checkStatsSessions(); + const not_null _session; base::flat_map _modifyRequests; @@ -683,6 +688,9 @@ private: QString requestedText; } _bio; + base::flat_map> _statsRequests; + base::Timer _statsSessionKillTimer; + const std::unique_ptr _authorizations; const std::unique_ptr _attachedStickers; const std::unique_ptr _blockedPeers; diff --git a/Telegram/SourceFiles/data/data_session.cpp b/Telegram/SourceFiles/data/data_session.cpp index bfe8019de..bc124d71f 100644 --- a/Telegram/SourceFiles/data/data_session.cpp +++ b/Telegram/SourceFiles/data/data_session.cpp @@ -4537,8 +4537,12 @@ MTP::DcId Session::statsDcId(not_null channel) { return (it == end(_channelStatsDcIds)) ? MTP::DcId(0) : it->second; } -void Session::applyStatsDcId(not_null channel, MTP::DcId dcId) { - _channelStatsDcIds[channel] = dcId; +void Session::applyStatsDcId( + not_null channel, + MTP::DcId dcId) { + if (dcId != channel->session().mainDcId()) { + _channelStatsDcIds[channel] = dcId; + } } void Session::webViewResultSent(WebViewResultSent &&sent) { diff --git a/Telegram/SourceFiles/mtproto/core_types.h b/Telegram/SourceFiles/mtproto/core_types.h index fba98aeee..75287ceba 100644 --- a/Telegram/SourceFiles/mtproto/core_types.h +++ b/Telegram/SourceFiles/mtproto/core_types.h @@ -47,7 +47,7 @@ constexpr auto kUpdaterDcShift = 0x03; constexpr auto kExportDcShift = 0x04; constexpr auto kExportMediaDcShift = 0x05; constexpr auto kGroupCallStreamDcShift = 0x06; -constexpr auto kStatsDcShift = 0x06; +constexpr auto kStatsDcShift = 0x07; constexpr auto kMaxMediaDcCount = 0x10; constexpr auto kBaseDownloadDcShift = 0x10; constexpr auto kBaseUploadDcShift = 0x20; diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index 2a3bc3d4e..7dc5be49b 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -150,8 +150,11 @@ public: ResponseHandler &&callbacks = {}, ShiftedDcId shiftedDcId = 0, crl::time msCanWait = 0, - mtpRequestId afterRequestId = 0) { - const auto requestId = details::GetNextRequestId(); + mtpRequestId afterRequestId = 0, + mtpRequestId overrideRequestId = 0) { + const auto requestId = overrideRequestId + ? overrideRequestId + : details::GetNextRequestId(); sendSerialized( requestId, details::SerializedRequest::Serialize(request), @@ -169,13 +172,15 @@ public: FailHandler &&onFail = nullptr, ShiftedDcId shiftedDcId = 0, crl::time msCanWait = 0, - mtpRequestId afterRequestId = 0) { + mtpRequestId afterRequestId = 0, + mtpRequestId overrideRequestId = 0) { return send( request, ResponseHandler{ std::move(onDone), std::move(onFail) }, shiftedDcId, msCanWait, - afterRequestId); + afterRequestId, + overrideRequestId); } template diff --git a/Telegram/SourceFiles/mtproto/sender.h b/Telegram/SourceFiles/mtproto/sender.h index 632f817f4..5330a0559 100644 --- a/Telegram/SourceFiles/mtproto/sender.h +++ b/Telegram/SourceFiles/mtproto/sender.h @@ -130,6 +130,9 @@ class Sender { void setToDC(ShiftedDcId dcId) noexcept { _dcId = dcId; } + void setOverrideRequestId(mtpRequestId id) noexcept { + _overrideRequestId = id; + } void setCanWait(crl::time ms) noexcept { _canWait = ms; } @@ -147,16 +150,16 @@ class Sender { _afterRequestId = requestId; } - ShiftedDcId takeDcId() const noexcept { + [[nodiscard]] ShiftedDcId takeDcId() const noexcept { return _dcId; } - crl::time takeCanWait() const noexcept { + [[nodiscard]] crl::time takeCanWait() const noexcept { return _canWait; } - DoneHandler takeOnDone() noexcept { + [[nodiscard]] DoneHandler takeOnDone() noexcept { return std::move(_done); } - FailHandler takeOnFail() { + [[nodiscard]] FailHandler takeOnFail() { return v::match(_fail, [&](auto &value) { return MakeFailHandler( _sender, @@ -164,11 +167,14 @@ class Sender { _failSkipPolicy); }); } - mtpRequestId takeAfter() const noexcept { + [[nodiscard]] mtpRequestId takeAfter() const noexcept { return _afterRequestId; } + [[nodiscard]] mtpRequestId takeOverrideRequestId() const noexcept { + return _overrideRequestId; + } - not_null sender() const noexcept { + [[nodiscard]] not_null sender() const noexcept { return _sender; } void registerRequest(mtpRequestId requestId) { @@ -187,6 +193,7 @@ class Sender { FailFullHandler> _fail; FailSkipPolicy _failSkipPolicy = FailSkipPolicy::Simple; mtpRequestId _afterRequestId = 0; + mtpRequestId _overrideRequestId = 0; }; @@ -207,9 +214,10 @@ public: : RequestBuilder(sender) , _request(std::move(request)) { } - SpecificRequestBuilder(SpecificRequestBuilder &&other) = default; public: + SpecificRequestBuilder(SpecificRequestBuilder &&other) = default; + [[nodiscard]] SpecificRequestBuilder &toDC(ShiftedDcId dcId) noexcept { setToDC(dcId); return *this; @@ -218,6 +226,10 @@ public: setCanWait(ms); return *this; } + [[nodiscard]] SpecificRequestBuilder &overrideId(mtpRequestId id) noexcept { + setOverrideRequestId(id); + return *this; + } using Result = typename Request::ResponseType; [[nodiscard]] SpecificRequestBuilder &done( @@ -295,7 +307,8 @@ public: takeOnFail(), takeDcId(), takeCanWait(), - takeAfter()); + takeAfter(), + takeOverrideRequestId()); registerRequest(id); return id; } @@ -347,6 +360,13 @@ public: } } + [[nodiscard]] mtpRequestId allocateRequestId() noexcept { + return details::GetNextRequestId(); + } + [[nodiscard]] bool pending(mtpRequestId requestId) noexcept { + return _requests.contains(requestId); + } + private: class RequestWrap { public: