From 33a15e69bb3bbe083a082d073a9d8ca1391f20e4 Mon Sep 17 00:00:00 2001 From: John Preston Date: Wed, 26 Mar 2025 11:24:27 +0400 Subject: [PATCH] Move blocks order checking to tde2e_api module. --- .../calls/group/calls_group_call.cpp | 52 ++----- .../calls/group/calls_group_call.h | 5 +- Telegram/SourceFiles/tde2e/tde2e_api.cpp | 144 +++++++++++++++++- Telegram/SourceFiles/tde2e/tde2e_api.h | 49 +++++- 4 files changed, 198 insertions(+), 52 deletions(-) diff --git a/Telegram/SourceFiles/calls/group/calls_group_call.cpp b/Telegram/SourceFiles/calls/group/calls_group_call.cpp index f086e5416c..ce0fddd314 100644 --- a/Telegram/SourceFiles/calls/group/calls_group_call.cpp +++ b/Telegram/SourceFiles/calls/group/calls_group_call.cpp @@ -54,10 +54,7 @@ constexpr auto kFixManualLargeVideoDuration = 5 * crl::time(1000); constexpr auto kFixSpeakingLargeVideoDuration = 3 * crl::time(1000); constexpr auto kFullAsMediumsCount = 4; // 1 Full is like 4 Mediums. constexpr auto kMaxMediumQualities = 16; // 4 Fulls or 16 Mediums. -constexpr auto kShortPollChainBlocksTimeout = 5 * crl::time(1000); constexpr auto kShortPollChainBlocksPerRequest = 50; -constexpr auto kSubChain0 = 0; -constexpr auto kSubChain1 = 1; [[nodiscard]] const Data::GroupCallParticipant *LookupParticipant( not_null peer, @@ -651,11 +648,10 @@ GroupCall::GroupCall( _e2e = std::make_shared( TdE2E::MakeUserId(_peer->session().user())); } - for (auto i = 0; i != kSubChainsCount; ++i) { - _subchains[i].timer.setCallback([=] { - checkChainBlocksRequest(i); - }); - } + _e2e->subchainRequests( + ) | rpl::start_with_next([=](TdE2E::Call::SubchainRequest request) { + requestSubchainBlocks(request.subchain, request.height); + }, _lifetime); } _muted.value( @@ -1478,8 +1474,8 @@ void GroupCall::rejoin(not_null as) { real->reloadIfStale(); } } - checkChainBlocksRequest(kSubChain0); - checkChainBlocksRequest(kSubChain1); + requestSubchainBlocks(0, 0); + requestSubchainBlocks(1, 0); }).fail([=](const MTP::Error &error) { _joinState.finish(); @@ -1500,37 +1496,25 @@ void GroupCall::rejoin(not_null as) { }); } -void GroupCall::checkChainBlocksRequest(int subchain) { +void GroupCall::requestSubchainBlocks(int subchain, int height) { Expects(subchain >= 0 && subchain < kSubChainsCount); auto &state = _subchains[subchain]; - if (state.requestId) { - return; - } - const auto now = crl::now(); - const auto left = state.lastUpdate + kShortPollChainBlocksTimeout - now; - if (left > 0) { - if (!state.timer.isActive()) { - state.timer.callOnce(left); - } - return; - } + _api.request(base::take(state.requestId)).cancel(); state.requestId = _api.request(MTPphone_GetGroupCallChainBlocks( inputCall(), MTP_int(subchain), - MTP_int(state.height), + MTP_int(height), MTP_int(kShortPollChainBlocksPerRequest) )).done([=](const MTPUpdates &result) { auto &state = _subchains[subchain]; - state.lastUpdate = crl::now(); - state.requestId = 0; _peer->session().api().applyUpdates(result); - state.timer.callOnce(kShortPollChainBlocksTimeout + 1); + state.requestId = 0; + _e2e->subchainBlocksRequestFinished(subchain); }).fail([=](const MTP::Error &error) { auto &state = _subchains[subchain]; - state.lastUpdate = crl::now(); state.requestId = 0; - state.timer.callOnce(kShortPollChainBlocksTimeout + 1); + _e2e->subchainBlocksRequestFinished(subchain); }).send(); } @@ -2123,15 +2107,11 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallChainBlocks &data) { return; } auto &entry = _subchains[subchain]; - entry.lastUpdate = crl::now(); - entry.height = data.vnext_offset().v; - entry.timer.callOnce(kShortPollChainBlocksTimeout + 1); + const auto inpoll = entry.requestId != 0; + const auto next = data.vnext_offset().v; + auto now = next - int(data.vblocks().v.size()); for (const auto &block : data.vblocks().v) { - const auto result = _e2e->apply({ block.v }); - if (result == TdE2E::Call::ApplyResult::BlockSkipped) { - AssertIsDebug(); - return; - } + _e2e->apply(subchain, now++, { block.v }, inpoll); } } diff --git a/Telegram/SourceFiles/calls/group/calls_group_call.h b/Telegram/SourceFiles/calls/group/calls_group_call.h index 148b760acb..e839eeaff7 100644 --- a/Telegram/SourceFiles/calls/group/calls_group_call.h +++ b/Telegram/SourceFiles/calls/group/calls_group_call.h @@ -477,9 +477,6 @@ private: } }; struct SubChainState { - crl::time lastUpdate = 0; - base::Timer timer; - int height = 0; mtpRequestId requestId = 0; }; @@ -543,7 +540,7 @@ private: void rejoinPresentation(); void leavePresentation(); void checkNextJoinAction(); - void checkChainBlocksRequest(int subchain); + void requestSubchainBlocks(int subchain, int height); void audioLevelsUpdated(const tgcalls::GroupLevelsUpdate &data); void setInstanceConnected(tgcalls::GroupNetworkState networkState); diff --git a/Telegram/SourceFiles/tde2e/tde2e_api.cpp b/Telegram/SourceFiles/tde2e/tde2e_api.cpp index 4c01426925..23bdc6bc08 100644 --- a/Telegram/SourceFiles/tde2e/tde2e_api.cpp +++ b/Telegram/SourceFiles/tde2e/tde2e_api.cpp @@ -8,14 +8,24 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "tde2e/tde2e_api.h" #include "base/assertion.h" +#include "base/debug_log.h" #include +#define LOG_ERROR(error) \ + LOG(("TdE2E Error %1: %2").arg(int(error.code)).arg(error.message.c_str())) + +#define LOG_AND_FAIL(error, reason) \ + LOG_ERROR(error); \ + fail(reason) + namespace TdE2E { namespace { constexpr auto kPermissionAdd = 1; constexpr auto kPermissionRemove = 2; +constexpr auto kShortPollChainBlocksTimeout = 5 * crl::time(1000); +constexpr auto kShortPollChainBlocksWaitFor = crl::time(1000); [[nodiscard]] tde2e_api::Slice Slice(const QByteArray &data) { return { @@ -38,6 +48,11 @@ Call::Call(UserId myUserId) memcpy(&_myKey, key.value().data(), sizeof(_myKey)); } +void Call::fail(CallFailure reason) { + _failure = reason; + _failures.fire_copy(reason); +} + PublicKey Call::myKey() const { return _myKey; } @@ -67,22 +82,135 @@ Block Call::makeZeroBlock() const { } void Call::create(const Block &last) { - tde2e_api::call_create(std::int64_t(_myKeyId.v), Slice(last.data)); + const auto id = tde2e_api::call_create( + std::int64_t(_myKeyId.v), + Slice(last.data)); + if (!id.is_ok()) { + LOG_AND_FAIL(id.error(), CallFailure::Unknown); + return; + } + + for (auto i = 0; i != kSubChainsCount; ++i) { + auto &entry = _subchains[i]; + entry.waitingTimer.setCallback([=] { + checkWaitingBlocks(i, true); + }); + entry.shortPollTimer.setCallback([=] { + shortPoll(i); + }); + entry.shortPollTimer.callOnce(kShortPollChainBlocksTimeout); + } } -Call::ApplyResult Call::apply(const Block &block) { +void Call::apply( + int subchain, + int index, + const Block &block, + bool fromShortPoll) { + Expects(subchain >= 0 && subchain < kSubChainsCount); + + if (!subchain && !_id.v) { + create(block); + } + if (failed()) { + return; + } + + auto &entry = _subchains[subchain]; + if (!fromShortPoll) { + entry.lastUpdate = crl::now(); + if (index > entry.height + 1) { + entry.waiting.emplace(index, block); + checkWaitingBlocks(subchain); + return; + } + } + const auto result = tde2e_api::call_apply_block( std::int64_t(_id.v), Slice(block.data)); - if (!result.is_ok()) { - const auto error = result.error(); - (void)error; + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; } - return result.is_ok() - ? ApplyResult::Success - : ApplyResult::BlockSkipped; + entry.height = std::max(entry.height, index); + checkWaitingBlocks(subchain); +} + +void Call::checkWaitingBlocks(int subchain, bool waited) { + Expects(subchain >= 0 && subchain < kSubChainsCount); + + if (failed()) { + return; + } + + auto &entry = _subchains[subchain]; + if (entry.shortPolling) { + return; + } + auto &waiting = entry.waiting; + entry.shortPollTimer.cancel(); + while (!waiting.empty()) { + const auto level = waiting.begin()->first; + if (level > entry.height + 1) { + if (waited) { + shortPoll(subchain); + } else { + entry.waitingTimer.callOnce(kShortPollChainBlocksWaitFor); + } + return; + } else if (level == entry.height + 1) { + const auto result = tde2e_api::call_apply_block( + std::int64_t(_id.v), + Slice(waiting.begin()->second.data)); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; + } + entry.height = level; + } + waiting.erase(waiting.begin()); + } + entry.waitingTimer.cancel(); + entry.shortPollTimer.callOnce(kShortPollChainBlocksTimeout); +} + +void Call::shortPoll(int subchain) { + Expects(subchain >= 0 && subchain < kSubChainsCount); + + auto &entry = _subchains[subchain]; + entry.waitingTimer.cancel(); + entry.shortPollTimer.cancel(); + entry.shortPolling = true; + _subchainRequests.fire({ subchain, entry.height }); +} + +rpl::producer Call::subchainRequests() const { + return _subchainRequests.events(); +} + +void Call::subchainBlocksRequestFinished(int subchain) { + Expects(subchain >= 0 && subchain < kSubChainsCount); + + if (failed()) { + return; + } + + auto &entry = _subchains[subchain]; + entry.shortPolling = false; + checkWaitingBlocks(subchain); +} + +std::optional Call::failed() const { + return _failure; +} + +rpl::producer Call::failures() const { + if (_failure) { + return rpl::single(*_failure); + } + return _failures.events(); } std::vector Call::encrypt(const std::vector &data) const { diff --git a/Telegram/SourceFiles/tde2e/tde2e_api.h b/Telegram/SourceFiles/tde2e/tde2e_api.h index ae8d70279e..17634d9835 100644 --- a/Telegram/SourceFiles/tde2e/tde2e_api.h +++ b/Telegram/SourceFiles/tde2e/tde2e_api.h @@ -8,6 +8,12 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #pragma once #include "base/basic_types.h" +#include "base/timer.h" + +#include +#include + +#include namespace TdE2E { @@ -39,6 +45,10 @@ struct Block { QByteArray data; }; +enum class CallFailure { + Unknown, +}; + class Call final { public: explicit Call(UserId myUserId); @@ -49,11 +59,21 @@ public: void create(const Block &last); - enum class ApplyResult { - Success, - BlockSkipped + void apply( + int subchain, + int index, + const Block &block, + bool fromShortPoll); + + struct SubchainRequest { + int subchain = 0; + int height = 0; }; - [[nodiscard]] ApplyResult apply(const Block &block); + [[nodiscard]] rpl::producer subchainRequests() const; + void subchainBlocksRequestFinished(int subchain); + + [[nodiscard]] std::optional failed() const; + [[nodiscard]] rpl::producer failures() const; [[nodiscard]] std::vector encrypt( const std::vector &data) const; @@ -61,10 +81,31 @@ public: const std::vector &data) const; private: + static constexpr int kSubChainsCount = 2; + + struct SubChainState { + base::Timer shortPollTimer; + base::Timer waitingTimer; + crl::time lastUpdate = 0; + base::flat_map waiting; + bool shortPolling = true; + int height = 0; + }; + + void fail(CallFailure reason); + + void checkWaitingBlocks(int subchain, bool waited = false); + void shortPoll(int subchain); + CallId _id; UserId _myUserId; PrivateKeyId _myKeyId; PublicKey _myKey; + std::optional _failure; + rpl::event_stream _failures; + + SubChainState _subchains[kSubChainsCount]; + rpl::event_stream _subchainRequests; };