Move blocks order checking to tde2e_api module.

This commit is contained in:
John Preston 2025-03-26 11:24:27 +04:00
parent ebe11fdb1e
commit 33a15e69bb
4 changed files with 198 additions and 52 deletions

View file

@ -54,10 +54,7 @@ constexpr auto kFixManualLargeVideoDuration = 5 * crl::time(1000);
constexpr auto kFixSpeakingLargeVideoDuration = 3 * crl::time(1000);
constexpr auto kFullAsMediumsCount = 4; // 1 Full is like 4 Mediums.
constexpr auto kMaxMediumQualities = 16; // 4 Fulls or 16 Mediums.
constexpr auto kShortPollChainBlocksTimeout = 5 * crl::time(1000);
constexpr auto kShortPollChainBlocksPerRequest = 50;
constexpr auto kSubChain0 = 0;
constexpr auto kSubChain1 = 1;
[[nodiscard]] const Data::GroupCallParticipant *LookupParticipant(
not_null<PeerData*> peer,
@ -651,11 +648,10 @@ GroupCall::GroupCall(
_e2e = std::make_shared<TdE2E::Call>(
TdE2E::MakeUserId(_peer->session().user()));
}
for (auto i = 0; i != kSubChainsCount; ++i) {
_subchains[i].timer.setCallback([=] {
checkChainBlocksRequest(i);
});
}
_e2e->subchainRequests(
) | rpl::start_with_next([=](TdE2E::Call::SubchainRequest request) {
requestSubchainBlocks(request.subchain, request.height);
}, _lifetime);
}
_muted.value(
@ -1478,8 +1474,8 @@ void GroupCall::rejoin(not_null<PeerData*> as) {
real->reloadIfStale();
}
}
checkChainBlocksRequest(kSubChain0);
checkChainBlocksRequest(kSubChain1);
requestSubchainBlocks(0, 0);
requestSubchainBlocks(1, 0);
}).fail([=](const MTP::Error &error) {
_joinState.finish();
@ -1500,37 +1496,25 @@ void GroupCall::rejoin(not_null<PeerData*> as) {
});
}
void GroupCall::checkChainBlocksRequest(int subchain) {
void GroupCall::requestSubchainBlocks(int subchain, int height) {
Expects(subchain >= 0 && subchain < kSubChainsCount);
auto &state = _subchains[subchain];
if (state.requestId) {
return;
}
const auto now = crl::now();
const auto left = state.lastUpdate + kShortPollChainBlocksTimeout - now;
if (left > 0) {
if (!state.timer.isActive()) {
state.timer.callOnce(left);
}
return;
}
_api.request(base::take(state.requestId)).cancel();
state.requestId = _api.request(MTPphone_GetGroupCallChainBlocks(
inputCall(),
MTP_int(subchain),
MTP_int(state.height),
MTP_int(height),
MTP_int(kShortPollChainBlocksPerRequest)
)).done([=](const MTPUpdates &result) {
auto &state = _subchains[subchain];
state.lastUpdate = crl::now();
state.requestId = 0;
_peer->session().api().applyUpdates(result);
state.timer.callOnce(kShortPollChainBlocksTimeout + 1);
state.requestId = 0;
_e2e->subchainBlocksRequestFinished(subchain);
}).fail([=](const MTP::Error &error) {
auto &state = _subchains[subchain];
state.lastUpdate = crl::now();
state.requestId = 0;
state.timer.callOnce(kShortPollChainBlocksTimeout + 1);
_e2e->subchainBlocksRequestFinished(subchain);
}).send();
}
@ -2123,15 +2107,11 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallChainBlocks &data) {
return;
}
auto &entry = _subchains[subchain];
entry.lastUpdate = crl::now();
entry.height = data.vnext_offset().v;
entry.timer.callOnce(kShortPollChainBlocksTimeout + 1);
const auto inpoll = entry.requestId != 0;
const auto next = data.vnext_offset().v;
auto now = next - int(data.vblocks().v.size());
for (const auto &block : data.vblocks().v) {
const auto result = _e2e->apply({ block.v });
if (result == TdE2E::Call::ApplyResult::BlockSkipped) {
AssertIsDebug();
return;
}
_e2e->apply(subchain, now++, { block.v }, inpoll);
}
}

View file

@ -477,9 +477,6 @@ private:
}
};
struct SubChainState {
crl::time lastUpdate = 0;
base::Timer timer;
int height = 0;
mtpRequestId requestId = 0;
};
@ -543,7 +540,7 @@ private:
void rejoinPresentation();
void leavePresentation();
void checkNextJoinAction();
void checkChainBlocksRequest(int subchain);
void requestSubchainBlocks(int subchain, int height);
void audioLevelsUpdated(const tgcalls::GroupLevelsUpdate &data);
void setInstanceConnected(tgcalls::GroupNetworkState networkState);

View file

@ -8,14 +8,24 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "tde2e/tde2e_api.h"
#include "base/assertion.h"
#include "base/debug_log.h"
#include <tde2e/td/e2e/e2e_api.h>
#define LOG_ERROR(error) \
LOG(("TdE2E Error %1: %2").arg(int(error.code)).arg(error.message.c_str()))
#define LOG_AND_FAIL(error, reason) \
LOG_ERROR(error); \
fail(reason)
namespace TdE2E {
namespace {
constexpr auto kPermissionAdd = 1;
constexpr auto kPermissionRemove = 2;
constexpr auto kShortPollChainBlocksTimeout = 5 * crl::time(1000);
constexpr auto kShortPollChainBlocksWaitFor = crl::time(1000);
[[nodiscard]] tde2e_api::Slice Slice(const QByteArray &data) {
return {
@ -38,6 +48,11 @@ Call::Call(UserId myUserId)
memcpy(&_myKey, key.value().data(), sizeof(_myKey));
}
void Call::fail(CallFailure reason) {
_failure = reason;
_failures.fire_copy(reason);
}
PublicKey Call::myKey() const {
return _myKey;
}
@ -67,22 +82,135 @@ Block Call::makeZeroBlock() const {
}
void Call::create(const Block &last) {
tde2e_api::call_create(std::int64_t(_myKeyId.v), Slice(last.data));
const auto id = tde2e_api::call_create(
std::int64_t(_myKeyId.v),
Slice(last.data));
if (!id.is_ok()) {
LOG_AND_FAIL(id.error(), CallFailure::Unknown);
return;
}
for (auto i = 0; i != kSubChainsCount; ++i) {
auto &entry = _subchains[i];
entry.waitingTimer.setCallback([=] {
checkWaitingBlocks(i, true);
});
entry.shortPollTimer.setCallback([=] {
shortPoll(i);
});
entry.shortPollTimer.callOnce(kShortPollChainBlocksTimeout);
}
}
Call::ApplyResult Call::apply(const Block &block) {
void Call::apply(
int subchain,
int index,
const Block &block,
bool fromShortPoll) {
Expects(subchain >= 0 && subchain < kSubChainsCount);
if (!subchain && !_id.v) {
create(block);
}
if (failed()) {
return;
}
auto &entry = _subchains[subchain];
if (!fromShortPoll) {
entry.lastUpdate = crl::now();
if (index > entry.height + 1) {
entry.waiting.emplace(index, block);
checkWaitingBlocks(subchain);
return;
}
}
const auto result = tde2e_api::call_apply_block(
std::int64_t(_id.v),
Slice(block.data));
if (!result.is_ok()) {
const auto error = result.error();
(void)error;
LOG_AND_FAIL(result.error(), CallFailure::Unknown);
return;
}
return result.is_ok()
? ApplyResult::Success
: ApplyResult::BlockSkipped;
entry.height = std::max(entry.height, index);
checkWaitingBlocks(subchain);
}
void Call::checkWaitingBlocks(int subchain, bool waited) {
Expects(subchain >= 0 && subchain < kSubChainsCount);
if (failed()) {
return;
}
auto &entry = _subchains[subchain];
if (entry.shortPolling) {
return;
}
auto &waiting = entry.waiting;
entry.shortPollTimer.cancel();
while (!waiting.empty()) {
const auto level = waiting.begin()->first;
if (level > entry.height + 1) {
if (waited) {
shortPoll(subchain);
} else {
entry.waitingTimer.callOnce(kShortPollChainBlocksWaitFor);
}
return;
} else if (level == entry.height + 1) {
const auto result = tde2e_api::call_apply_block(
std::int64_t(_id.v),
Slice(waiting.begin()->second.data));
if (!result.is_ok()) {
LOG_AND_FAIL(result.error(), CallFailure::Unknown);
return;
}
entry.height = level;
}
waiting.erase(waiting.begin());
}
entry.waitingTimer.cancel();
entry.shortPollTimer.callOnce(kShortPollChainBlocksTimeout);
}
void Call::shortPoll(int subchain) {
Expects(subchain >= 0 && subchain < kSubChainsCount);
auto &entry = _subchains[subchain];
entry.waitingTimer.cancel();
entry.shortPollTimer.cancel();
entry.shortPolling = true;
_subchainRequests.fire({ subchain, entry.height });
}
rpl::producer<Call::SubchainRequest> Call::subchainRequests() const {
return _subchainRequests.events();
}
void Call::subchainBlocksRequestFinished(int subchain) {
Expects(subchain >= 0 && subchain < kSubChainsCount);
if (failed()) {
return;
}
auto &entry = _subchains[subchain];
entry.shortPolling = false;
checkWaitingBlocks(subchain);
}
std::optional<CallFailure> Call::failed() const {
return _failure;
}
rpl::producer<CallFailure> Call::failures() const {
if (_failure) {
return rpl::single(*_failure);
}
return _failures.events();
}
std::vector<uint8_t> Call::encrypt(const std::vector<uint8_t> &data) const {

View file

@ -8,6 +8,12 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#pragma once
#include "base/basic_types.h"
#include "base/timer.h"
#include <rpl/producer.h>
#include <rpl/event_stream.h>
#include <crl/crl_time.h>
namespace TdE2E {
@ -39,6 +45,10 @@ struct Block {
QByteArray data;
};
enum class CallFailure {
Unknown,
};
class Call final {
public:
explicit Call(UserId myUserId);
@ -49,11 +59,21 @@ public:
void create(const Block &last);
enum class ApplyResult {
Success,
BlockSkipped
void apply(
int subchain,
int index,
const Block &block,
bool fromShortPoll);
struct SubchainRequest {
int subchain = 0;
int height = 0;
};
[[nodiscard]] ApplyResult apply(const Block &block);
[[nodiscard]] rpl::producer<SubchainRequest> subchainRequests() const;
void subchainBlocksRequestFinished(int subchain);
[[nodiscard]] std::optional<CallFailure> failed() const;
[[nodiscard]] rpl::producer<CallFailure> failures() const;
[[nodiscard]] std::vector<uint8_t> encrypt(
const std::vector<uint8_t> &data) const;
@ -61,10 +81,31 @@ public:
const std::vector<uint8_t> &data) const;
private:
static constexpr int kSubChainsCount = 2;
struct SubChainState {
base::Timer shortPollTimer;
base::Timer waitingTimer;
crl::time lastUpdate = 0;
base::flat_map<int, Block> waiting;
bool shortPolling = true;
int height = 0;
};
void fail(CallFailure reason);
void checkWaitingBlocks(int subchain, bool waited = false);
void shortPoll(int subchain);
CallId _id;
UserId _myUserId;
PrivateKeyId _myKeyId;
PublicKey _myKey;
std::optional<CallFailure> _failure;
rpl::event_stream<CallFailure> _failures;
SubChainState _subchains[kSubChainsCount];
rpl::event_stream<SubchainRequest> _subchainRequests;
};