diff --git a/Telegram/SourceFiles/calls/group/calls_group_call.cpp b/Telegram/SourceFiles/calls/group/calls_group_call.cpp index 066fdddcf6..14125ca7c0 100644 --- a/Telegram/SourceFiles/calls/group/calls_group_call.cpp +++ b/Telegram/SourceFiles/calls/group/calls_group_call.cpp @@ -421,21 +421,6 @@ std::shared_ptr ParseVideoParams( return data; } -std::shared_ptr ParseParticipantState( - const MTPDgroupCallParticipant &data) { - if (!data.vpublic_key() || data.vpeer().type() != mtpc_peerUser) { - return nullptr; - } - const auto &v = *data.vpublic_key(); - const auto userId = data.vpeer().c_peerUser().vuser_id().v; - using State = TdE2E::ParticipantState; - return std::make_shared(State{ - .id = uint64(userId), - .key = { .a = v.h.h, .b = v.h.l, .c = v.l.h, .d = v.l.l }, - }); - -} - GroupCall::LoadPartTask::LoadPartTask( base::weak_ptr call, int64 time, @@ -643,17 +628,6 @@ GroupCall::GroupCall( , _listenersHidden(join.rtmp) , _rtmp(join.rtmp) , _rtmpVolume(Group::kDefaultVolume) { - if (_conferenceCall) { - if (!_e2e) { - _e2e = std::make_shared( - TdE2E::MakeUserId(_peer->session().user())); - } - _e2e->subchainRequests( - ) | rpl::start_with_next([=](TdE2E::Call::SubchainRequest request) { - requestSubchainBlocks(request.subchain, request.height); - }, _lifetime); - } - _muted.value( ) | rpl::combine_previous( ) | rpl::start_with_next([=](MuteState previous, MuteState state) { @@ -705,6 +679,9 @@ GroupCall::GroupCall( setupMediaDevices(); setupOutgoingVideo(); + if (_conferenceCall) { + setupConferenceCall(); + } if (_id) { this->join(inputCall); @@ -724,6 +701,62 @@ GroupCall::~GroupCall() { } } +void GroupCall::setupConferenceCall() { + if (!_e2e) { + _e2e = std::make_shared( + TdE2E::MakeUserId(_peer->session().user())); + } + _e2e->subchainRequests( + ) | rpl::start_with_next([=](TdE2E::Call::SubchainRequest request) { + requestSubchainBlocks(request.subchain, request.height); + }, _lifetime); + _e2e->sendOutboundBlock( + ) | rpl::start_with_next([=](QByteArray &&block) { + sendOutboundBlock(std::move(block)); + }, _lifetime); + + _conferenceCall->staleParticipantId( + ) | rpl::start_with_next([=](UserId staleId) { + removeConferenceParticipant(staleId); + }, _lifetime); + _e2e->participantsSetValue( + ) | rpl::start_with_next([=](const TdE2E::ParticipantsSet &set) { + auto users = base::flat_set(); + users.reserve(set.list.size()); + auto ids = QStringList(); + for (const auto &id : set.list) { + users.emplace(UserId(id.v)); + ids.push_back('"' + _peer->owner().user(UserId(id.v))->name() + '"'); + } + LOG(("ACCESS: ") + ids.join(", ")); + _conferenceCall->setParticipantsWithAccess(std::move(users)); + }, _lifetime); +} + +void GroupCall::removeConferenceParticipant(UserId id) { + Expects(_e2e != nullptr); + + const auto block = _e2e->makeRemoveBlock(TdE2E::MakeUserId(id)); + if (block.data.isEmpty()) { + return; + } + _api.request(MTPphone_DeleteConferenceCallParticipant( + inputCall(), + _peer->owner().user(id)->input, + MTP_bytes(block.data) + )).done([=](const MTPUpdates &result) { + _peer->session().api().applyUpdates(result); + }).fail([=](const MTP::Error &error) { + const auto type = error.type(); + if (type == u"GROUPCALL_FORBIDDEN"_q) { + setState(State::Joining); + rejoin(); + } else { + LOG(("NOTREMOVED: %1").arg(type)); + } + }).send(); +} + bool GroupCall::isSharingScreen() const { return _isSharingScreen.current(); } @@ -1492,11 +1525,15 @@ void GroupCall::sendJoinRequest() { } if (_e2e) { _e2e->joined(); + if (!_pendingOutboundBlock.isEmpty()) { + sendOutboundBlock(base::take(_pendingOutboundBlock)); + } } }).fail([=](const MTP::Error &error) { const auto type = error.type(); if (_e2e) { - if (type == u"CONF_WRITE_CHAIN_INVALID"_q) { + if (type == u"BLOCK_INVALID"_q + || type.startsWith(u"CONF_WRITE_CHAIN_INVALID"_q)) { refreshLastBlockAndJoin(); return; } @@ -1583,6 +1620,29 @@ void GroupCall::requestSubchainBlocks(int subchain, int height) { }).send(); } +void GroupCall::sendOutboundBlock(QByteArray block) { + _pendingOutboundBlock = QByteArray(); + _api.request(MTPphone_SendConferenceCallBroadcast( + inputCall(), + MTP_bytes(block) + )).done([=](const MTPUpdates &result) { + _peer->session().api().applyUpdates(result); + }).fail([=](const MTP::Error &error) { + const auto type = error.type(); + if (type == u"GROUPCALL_FORBIDDEN"_q) { + _pendingOutboundBlock = block; + setState(State::Joining); + rejoin(); + } else if (type == u"BLOCK_INVALID"_q + || type.startsWith(u"CONF_WRITE_CHAIN_INVALID"_q)) { + LOG(("Call Error: Could not broadcast block: %1").arg(type)); + } else { + LOG(("HMM")); + sendOutboundBlock(block); + } + }).send(); +} + void GroupCall::checkNextJoinAction() { if (_joinState.action != JoinAction::None) { return; @@ -1741,8 +1801,7 @@ void GroupCall::applyMeInCallLocally() { | Flag::f_volume // Without flag the volume is reset to 100%. | Flag::f_volume_by_admin // Self volume can only be set by admin. | ((muted() != MuteState::Active) ? Flag::f_muted : Flag(0)) - | (raisedHandRating > 0 ? Flag::f_raise_hand_rating : Flag(0)) - | (_e2e ? Flag::f_public_key : Flag(0)); + | (raisedHandRating > 0 ? Flag::f_raise_hand_rating : Flag(0)); real->applyLocalUpdate( MTP_updateGroupCallParticipants( inputCall(), @@ -1758,10 +1817,7 @@ void GroupCall::applyMeInCallLocally() { MTPstring(), // Don't update about text in local updates. MTP_long(raisedHandRating), MTPGroupCallParticipantVideo(), - MTPGroupCallParticipantVideo(), - (_e2e - ? TdE2E::PublicKeyToMTP(_e2e->myKey()) - : MTPint256()))), + MTPGroupCallParticipantVideo())), MTP_int(0)).c_updateGroupCallParticipants()); } @@ -1792,11 +1848,7 @@ void GroupCall::applyParticipantLocally( | (participantPeer == joinAs() ? Flag::f_self : Flag(0)) | (participant->raisedHandRating ? Flag::f_raise_hand_rating - : Flag(0)) - | (participant->e2eState ? Flag::f_public_key : Flag(0)); - const auto publicKey = participant->e2eState - ? participant->e2eState->key - : TdE2E::PublicKey(); + : Flag(0)); _peer->groupCall()->applyLocalUpdate( MTP_updateGroupCallParticipants( inputCall(), @@ -1812,10 +1864,7 @@ void GroupCall::applyParticipantLocally( MTPstring(), // Don't update about text in local updates. MTP_long(participant->raisedHandRating), MTPGroupCallParticipantVideo(), - MTPGroupCallParticipantVideo(), - MTP_int256( - MTP_int128(publicKey.a, publicKey.b), - MTP_int128(publicKey.c, publicKey.d)))), + MTPGroupCallParticipantVideo())), MTP_int(0)).c_updateGroupCallParticipants()); } diff --git a/Telegram/SourceFiles/calls/group/calls_group_call.h b/Telegram/SourceFiles/calls/group/calls_group_call.h index 067351761f..71c124f408 100644 --- a/Telegram/SourceFiles/calls/group/calls_group_call.h +++ b/Telegram/SourceFiles/calls/group/calls_group_call.h @@ -43,7 +43,6 @@ class GroupCall; } // namespace Data namespace TdE2E { -struct ParticipantState; class Call; } // namespace TdE2E @@ -170,8 +169,6 @@ struct ParticipantVideoParams; const tl::conditional &camera, const tl::conditional &screen, const std::shared_ptr &existing); -[[nodiscard]] std::shared_ptr ParseParticipantState( - const MTPDgroupCallParticipant &data); [[nodiscard]] const std::string &GetCameraEndpoint( const std::shared_ptr ¶ms); @@ -282,6 +279,7 @@ public: void startScheduledNow(); void toggleScheduleStartSubscribed(bool subscribed); void setNoiseSuppression(bool enabled); + void removeConferenceParticipant(UserId userId); bool emitShareScreenError(); bool emitShareCameraError(); @@ -548,6 +546,7 @@ private: void sendJoinRequest(); void refreshLastBlockAndJoin(); void requestSubchainBlocks(int subchain, int height); + void sendOutboundBlock(QByteArray block); void audioLevelsUpdated(const tgcalls::GroupLevelsUpdate &data); void setInstanceConnected(tgcalls::GroupNetworkState networkState); @@ -588,6 +587,7 @@ private: void setupMediaDevices(); void setupOutgoingVideo(); + void setupConferenceCall(); void setScreenEndpoint(std::string endpoint); void setCameraEndpoint(std::string endpoint); void addVideoOutput(const std::string &endpoint, SinkPointer sink); @@ -607,6 +607,7 @@ private: const not_null _delegate; const std::shared_ptr _conferenceCall; std::shared_ptr _e2e; + QByteArray _pendingOutboundBlock; not_null _peer; // Can change in legacy group migration. rpl::event_stream _peerStream; diff --git a/Telegram/SourceFiles/data/data_group_call.cpp b/Telegram/SourceFiles/data/data_group_call.cpp index 6a1663a5ee..bab56f63aa 100644 --- a/Telegram/SourceFiles/data/data_group_call.cpp +++ b/Telegram/SourceFiles/data/data_group_call.cpp @@ -27,6 +27,7 @@ constexpr auto kSpeakingAfterActive = crl::time(6000); constexpr auto kActiveAfterJoined = crl::time(1000); constexpr auto kWaitForUpdatesTimeout = 3 * crl::time(1000); constexpr auto kReloadStaleTimeout = 16 * crl::time(1000); +constexpr auto kMaxConferenceMembers = 50; [[nodiscard]] QString ExtractNextOffset(const MTPphone_GroupCall &call) { return call.match([&](const MTPDphone_groupCall &data) { @@ -75,6 +76,30 @@ GroupCall::GroupCall( , _listenersHidden(rtmp) { if (_conference) { session().data().registerGroupCall(this); + + _participantUpdates.events( + ) | rpl::filter([=](const ParticipantUpdate &update) { + return !update.now + && !update.was->peer->isSelf() + && !_participantsWithAccess.current().empty(); + }) | rpl::start_with_next([=](const ParticipantUpdate &update) { + if (const auto id = peerToUser(update.was->peer->id)) { + if (_participantsWithAccess.current().contains(id)) { + _staleParticipantId.fire_copy(id); + } + } + }, _checkStaleLifetime); + + _participantsWithAccess.changes( + ) | rpl::filter([=](const base::flat_set &list) { + return !list.empty(); + }) | rpl::start_with_next([=] { + if (_allParticipantsLoaded) { + checkStaleParticipants(); + } else { + requestParticipants(); + } + }, _checkStaleLifetime); } } @@ -84,6 +109,7 @@ GroupCall::~GroupCall() { } api().request(_unknownParticipantPeersRequestId).cancel(); api().request(_participantsRequestId).cancel(); + api().request(_checkStaleRequestId).cancel(); api().request(_reloadRequestId).cancel(); } @@ -158,7 +184,7 @@ void GroupCall::requestParticipants() { : ApplySliceSource::SliceLoaded)); setServerParticipantsCount(data.vcount().v); if (data.vparticipants().v.isEmpty()) { - _allParticipantsLoaded = true; + setParticipantsLoaded(); } finishParticipantsSliceRequest(); if (reloaded) { @@ -169,7 +195,7 @@ void GroupCall::requestParticipants() { _participantsRequestId = 0; const auto reloaded = processSavedFullCall(); setServerParticipantsCount(_participants.size()); - _allParticipantsLoaded = true; + setParticipantsLoaded(); finishParticipantsSliceRequest(); if (reloaded) { _participantsReloaded.fire({}); @@ -177,6 +203,76 @@ void GroupCall::requestParticipants() { }).send(); } +void GroupCall::setParticipantsLoaded() { + _allParticipantsLoaded = true; + checkStaleParticipants(); +} + +void GroupCall::checkStaleParticipants() { + if (_checkStaleRequestId) { + return; + } + const auto &list = _participantsWithAccess.current(); + if (list.empty()) { + return; + } + auto existing = base::flat_set(); + existing.reserve(_participants.size() + 1); + existing.emplace(session().userId()); + for (const auto &participant : _participants) { + if (const auto id = peerToUser(participant.peer->id)) { + existing.emplace(id); + } + } + if (list.size() > existing.size()) { + checkStaleRequest(); + return; + } + for (const auto &id : list) { + if (!existing.contains(id)) { + checkStaleRequest(); + return; + } + } +} + +void GroupCall::checkStaleRequest() { + if (_checkStaleRequestId) { + return; + } + _checkStaleRequestId = api().request(MTPphone_GetGroupParticipants( + input(), + MTP_vector(), // ids + MTP_vector(), // ssrcs + MTP_string(QString()), + MTP_int(kMaxConferenceMembers) + )).done([=](const MTPphone_GroupParticipants &result) { + _checkStaleRequestId = 0; + const auto &list = _participantsWithAccess.current(); + if (list.empty()) { + return; + } + auto existing = base::flat_set(); + const auto &data = result.data(); + existing.reserve(data.vparticipants().v.size() + 1); + existing.emplace(session().userId()); + for (const auto &participant : data.vparticipants().v) { + const auto peerId = peerFromMTP(participant.data().vpeer()); + if (const auto id = peerToUser(peerId)) { + existing.emplace(id); + } + } + for (const auto &id : list) { + if (!existing.contains(id)) { + _staleParticipantId.fire_copy(id); + return; + } + } + }).fail([=] { + _checkStaleRequestId = 0; + }).send(); +} + bool GroupCall::processSavedFullCall() { if (!_savedFull) { return false; @@ -287,6 +383,29 @@ auto GroupCall::participantSpeaking() const return _participantSpeaking.events(); } +void GroupCall::setParticipantsWithAccess(base::flat_set list) { + _participantsWithAccess = std::move(list); + if (_allParticipantsLoaded) { + checkStaleParticipants(); + } else { + requestParticipants(); + } +} + +auto GroupCall::participantsWithAccessCurrent() const +-> const base::flat_set & { + return _participantsWithAccess.current(); +} + +auto GroupCall::participantsWithAccessValue() const +-> rpl::producer> { + return _participantsWithAccess.value(); +} + +rpl::producer GroupCall::staleParticipantId() const { + return _staleParticipantId.events(); +} + void GroupCall::enqueueUpdate(const MTPUpdate &update) { update.match([&](const MTPDupdateGroupCall &updateData) { updateData.vcall().match([&](const MTPDgroupCall &data) { @@ -471,9 +590,7 @@ void GroupCall::applyEnqueuedUpdate(const MTPUpdate &update) { }, [](const auto &) { Unexpected("Type in GroupCall::applyEnqueuedUpdate."); }); - Core::App().calls().applyGroupCallUpdateChecked( - &_peer->session(), - update); + Core::App().calls().applyGroupCallUpdateChecked(&session(), update); } void GroupCall::processQueuedUpdates() { @@ -634,22 +751,15 @@ void GroupCall::applyParticipantsSlice( const auto existingVideoParams = (i != end(_participants)) ? i->videoParams : nullptr; - const auto existingState = (i != end(_participants)) - ? i->e2eState - : nullptr; auto videoParams = localUpdate ? existingVideoParams : Calls::ParseVideoParams( data.vvideo(), data.vpresentation(), existingVideoParams); - auto e2eState = localUpdate - ? existingState - : Calls::ParseParticipantState(data); const auto value = Participant{ .peer = participantPeer, .videoParams = std::move(videoParams), - .e2eState = std::move(e2eState), .date = data.vdate().v, .lastActive = lastActive, .raisedHandRating = raisedHandRating, @@ -1003,7 +1113,7 @@ bool GroupCall::joinedToTop() const { } ApiWrap &GroupCall::api() const { - return _peer->session().api(); + return session().api(); } } // namespace Data diff --git a/Telegram/SourceFiles/data/data_group_call.h b/Telegram/SourceFiles/data/data_group_call.h index 104ee83b21..0c07a9f11d 100644 --- a/Telegram/SourceFiles/data/data_group_call.h +++ b/Telegram/SourceFiles/data/data_group_call.h @@ -38,7 +38,6 @@ struct LastSpokeTimes { struct GroupCallParticipant { not_null peer; std::shared_ptr videoParams; - std::shared_ptr e2eState; TimeId date = 0; TimeId lastActive = 0; uint64 raisedHandRating = 0; @@ -146,6 +145,16 @@ public: [[nodiscard]] auto participantSpeaking() const -> rpl::producer>; + void setParticipantsWithAccess(base::flat_set list); + [[nodiscard]] auto participantsWithAccessCurrent() const + -> const base::flat_set &; + [[nodiscard]] auto participantsWithAccessValue() const + -> rpl::producer>; + [[nodiscard]] rpl::producer staleParticipantId() const; + void setParticipantsLoaded(); + void checkStaleParticipants(); + void checkStaleRequest(); + void enqueueUpdate(const MTPUpdate &update); void applyLocalUpdate( const MTPDupdateGroupCallParticipants &update); @@ -254,6 +263,11 @@ private: rpl::event_stream> _participantSpeaking; rpl::event_stream<> _participantsReloaded; + rpl::variable> _participantsWithAccess; + rpl::event_stream _staleParticipantId; + mtpRequestId _checkStaleRequestId = 0; + rpl::lifetime _checkStaleLifetime; + bool _joinMuted : 1 = false; bool _canChangeJoinMuted : 1 = true; bool _allParticipantsLoaded : 1 = false; diff --git a/Telegram/SourceFiles/mtproto/scheme/api.tl b/Telegram/SourceFiles/mtproto/scheme/api.tl index 5857a15726..5da6c9022e 100644 --- a/Telegram/SourceFiles/mtproto/scheme/api.tl +++ b/Telegram/SourceFiles/mtproto/scheme/api.tl @@ -1347,7 +1347,7 @@ groupCall#d597650c flags:# join_muted:flags.1?true can_change_join_muted:flags.2 inputGroupCall#d8aa840f id:long access_hash:long = InputGroupCall; inputGroupCallSlug#fe06823f slug:string = InputGroupCall; -groupCallParticipant#23860077 flags:# muted:flags.0?true left:flags.1?true can_self_unmute:flags.2?true just_joined:flags.4?true versioned:flags.5?true min:flags.8?true muted_by_you:flags.9?true volume_by_admin:flags.10?true self:flags.12?true video_joined:flags.15?true peer:Peer date:int active_date:flags.3?int source:int volume:flags.7?int about:flags.11?string raise_hand_rating:flags.13?long video:flags.6?GroupCallParticipantVideo presentation:flags.14?GroupCallParticipantVideo public_key:flags.16?int256 = GroupCallParticipant; +groupCallParticipant#eba636fe flags:# muted:flags.0?true left:flags.1?true can_self_unmute:flags.2?true just_joined:flags.4?true versioned:flags.5?true min:flags.8?true muted_by_you:flags.9?true volume_by_admin:flags.10?true self:flags.12?true video_joined:flags.15?true peer:Peer date:int active_date:flags.3?int source:int volume:flags.7?int about:flags.11?string raise_hand_rating:flags.13?long video:flags.6?GroupCallParticipantVideo presentation:flags.14?GroupCallParticipantVideo = GroupCallParticipant; phone.groupCall#9e727aad call:GroupCall participants:Vector participants_next_offset:string chats:Vector users:Vector = phone.GroupCall; diff --git a/Telegram/SourceFiles/tde2e/tde2e_api.cpp b/Telegram/SourceFiles/tde2e/tde2e_api.cpp index bd3940b39c..326cfa048a 100644 --- a/Telegram/SourceFiles/tde2e/tde2e_api.cpp +++ b/Telegram/SourceFiles/tde2e/tde2e_api.cpp @@ -34,6 +34,24 @@ constexpr auto kShortPollChainBlocksWaitFor = crl::time(1000); }; } +[[nodiscard]] tde2e_api::Slice Slice(const std::vector &data) { + return { + reinterpret_cast(data.data()), + std::string_view::size_type(data.size()), + }; +} + +[[nodiscard]] ParticipantsSet ParseParticipantsSet( + const tde2e_api::CallState &state) { + auto result = ParticipantsSet(); + const auto &list = state.participants; + result.list.reserve(list.size()); + for (const auto &entry : list) { + result.list.emplace(UserId{ uint64(entry.user_id) }); + } + return result; +} + } // namespace Call::Call(UserId myUserId) @@ -48,7 +66,14 @@ Call::Call(UserId myUserId) memcpy(&_myKey, key.value().data(), sizeof(_myKey)); } +Call::~Call() { + if (const auto id = libId()) { + tde2e_api::call_destroy(id); + } +} + void Call::fail(CallFailure reason) { + _emojiHash = QByteArray(); _failure = reason; _failures.fire_copy(reason); } @@ -99,20 +124,92 @@ Block Call::makeJoinBlock() { }; } +Block Call::makeRemoveBlock(UserId id) { + if (failed() || !_id || id == _myUserId) { + return {}; + } + + auto state = tde2e_api::call_get_state(libId()); + if (!state.is_ok()) { + LOG_AND_FAIL(state.error(), CallFailure::Unknown); + return {}; + } + auto found = false; + auto updated = state.value(); + auto &list = updated.participants; + for (auto i = begin(list); i != end(list); ++i) { + if (uint64(i->user_id) == id.v) { + list.erase(i); + found = true; + break; + } + } + if (!found) { + return {}; + } + const auto result = tde2e_api::call_create_change_state_block( + libId(), + updated); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return {}; + } + return { + .data = QByteArray::fromStdString(result.value()), + }; +} + +rpl::producer Call::participantsSetValue() const { + return _participantsSet.value(); +} + void Call::joined() { shortPoll(0); - if (_id.v) { + if (_id) { shortPoll(1); } } -void Call::apply(const Block &last) { - if (_id.v) { - const auto result = tde2e_api::call_apply_block( - std::int64_t(_id.v), +void Call::apply(int subchain, const Block &last) { + Expects(_id || !subchain); + + auto verification = std::optional(); + const auto guard = gsl::finally([&] { + if (failed() || !_id) { + return; + } else if (!verification) { + const auto id = libId(); + auto result = tde2e_api::call_get_verification_state(id); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; + } + verification = std::move(result.value()); + } + _emojiHash = verification->emoji_hash.has_value() + ? QByteArray::fromStdString(*verification->emoji_hash) + : QByteArray(); + checkForOutboundMessages(); + }); + + if (subchain) { + auto result = tde2e_api::call_receive_inbound_message( + libId(), Slice(last.data)); if (!result.is_ok()) { LOG_AND_FAIL(result.error(), CallFailure::Unknown); + } else { + verification = std::move(result.value()); + } + return; + } else if (_id) { + const auto result = tde2e_api::call_apply_block( + libId(), + Slice(last.data)); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + } else { + _participantsSet = ParseParticipantsSet(result.value()); } return; } @@ -137,6 +234,26 @@ void Call::apply(const Block &last) { entry.shortPollTimer.callOnce(kShortPollChainBlocksTimeout); } } + + const auto state = tde2e_api::call_get_state(libId()); + if (!state.is_ok()) { + LOG_AND_FAIL(state.error(), CallFailure::Unknown); + return; + } + _participantsSet = ParseParticipantsSet(state.value()); +} + +void Call::checkForOutboundMessages() { + Expects(_id); + + const auto result = tde2e_api::call_pull_outbound_messages(libId()); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; + } else if (!result.value().empty()) { + _outboundBlocks.fire( + QByteArray::fromStdString(result.value().back())); + } } void Call::apply( @@ -145,7 +262,7 @@ void Call::apply( const Block &block, bool fromShortPoll) { Expects(subchain >= 0 && subchain < kSubChainsCount); - Expects(_id.v != 0 || !fromShortPoll || !subchain); + Expects(_id || !fromShortPoll || !subchain); if (!subchain && index >= _lastBlock0Height) { _lastBlock0 = block; @@ -158,7 +275,7 @@ void Call::apply( auto &entry = _subchains[subchain]; if (!fromShortPoll) { entry.lastUpdate = crl::now(); - if (index > entry.height || (!_id.v && subchain != 0)) { + if (index > entry.height || (!_id && subchain != 0)) { entry.waiting.emplace(index, block); checkWaitingBlocks(subchain); return; @@ -167,8 +284,8 @@ void Call::apply( if (failed()) { return; - } else if (!_id.v || entry.height == index) { - apply(block); + } else if (!_id || entry.height == index) { + apply(subchain, block); } entry.height = index + 1; checkWaitingBlocks(subchain); @@ -182,7 +299,7 @@ void Call::checkWaitingBlocks(int subchain, bool waited) { } auto &entry = _subchains[subchain]; - if (!_id.v) { + if (!_id) { entry.waitingTimer.callOnce(kShortPollChainBlocksWaitFor); return; } else if (entry.shortPolling) { @@ -200,12 +317,28 @@ void Call::checkWaitingBlocks(int subchain, bool waited) { } return; } else if (level == entry.height + 1) { - const auto result = tde2e_api::call_apply_block( - std::int64_t(_id.v), - Slice(waiting.begin()->second.data)); - if (!result.is_ok()) { - LOG_AND_FAIL(result.error(), CallFailure::Unknown); - return; + const auto slice = Slice(waiting.begin()->second.data); + if (subchain) { + auto result = tde2e_api::call_receive_inbound_message( + libId(), + slice); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; + } + _emojiHash = result.value().emoji_hash.has_value() + ? QByteArray::fromStdString(*result.value().emoji_hash) + : QByteArray(); + checkForOutboundMessages(); + } else { + const auto result = tde2e_api::call_apply_block( + libId(), + slice); + if (!result.is_ok()) { + LOG_AND_FAIL(result.error(), CallFailure::Unknown); + return; + } + _participantsSet = ParseParticipantsSet(result.value()); } entry.height = level; } @@ -221,7 +354,7 @@ void Call::shortPoll(int subchain) { auto &entry = _subchains[subchain]; entry.waitingTimer.cancel(); entry.shortPollTimer.cancel(); - if (subchain && !_id.v) { + if (subchain && !_id) { // Not ready. entry.waitingTimer.callOnce(kShortPollChainBlocksWaitFor); return; @@ -230,6 +363,10 @@ void Call::shortPoll(int subchain) { _subchainRequests.fire({ subchain, entry.height }); } +std::int64_t Call::libId() const { + return std::int64_t(_id.v); +} + rpl::producer Call::subchainRequests() const { return _subchainRequests.events(); } @@ -246,6 +383,10 @@ void Call::subchainBlocksRequestFinished(int subchain) { checkWaitingBlocks(subchain); } +rpl::producer Call::sendOutboundBlock() const { + return _outboundBlocks.events(); +} + std::optional Call::failed() const { return _failure; } @@ -257,13 +398,16 @@ rpl::producer Call::failures() const { return _failures.events(); } +QByteArray Call::emojiHash() const { + return _emojiHash.current(); +} + +rpl::producer Call::emojiHashValue() const { + return _emojiHash.value(); +} + std::vector Call::encrypt(const std::vector &data) const { - const auto result = tde2e_api::call_encrypt( - std::int64_t(_id.v), - std::string_view{ - reinterpret_cast(data.data()), - data.size(), - }); + const auto result = tde2e_api::call_encrypt(libId(), Slice(data)); if (!result.is_ok()) { return {}; } @@ -274,12 +418,7 @@ std::vector Call::encrypt(const std::vector &data) const { } std::vector Call::decrypt(const std::vector &data) const { - const auto result = tde2e_api::call_decrypt( - std::int64_t(_id.v), - std::string_view{ - reinterpret_cast(data.data()), - data.size(), - }); + const auto result = tde2e_api::call_decrypt(libId(), Slice(data)); if (!result.is_ok()) { return {}; } diff --git a/Telegram/SourceFiles/tde2e/tde2e_api.h b/Telegram/SourceFiles/tde2e/tde2e_api.h index 931702b0fb..6252c8effa 100644 --- a/Telegram/SourceFiles/tde2e/tde2e_api.h +++ b/Telegram/SourceFiles/tde2e/tde2e_api.h @@ -8,10 +8,12 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #pragma once #include "base/basic_types.h" +#include "base/flat_set.h" #include "base/timer.h" -#include #include +#include +#include #include @@ -19,6 +21,9 @@ namespace TdE2E { struct UserId { uint64 v = 0; + + friend inline constexpr auto operator<=>(UserId, UserId) = default; + friend inline constexpr bool operator==(UserId, UserId) = default; }; struct PrivateKeyId { @@ -27,6 +32,10 @@ struct PrivateKeyId { struct CallId { uint64 v = 0; + + explicit operator bool() const { + return v != 0; + } }; struct PublicKey { @@ -41,6 +50,14 @@ struct ParticipantState { PublicKey key; }; +struct ParticipantsSet { + base::flat_set list; + + friend inline bool operator==( + const ParticipantsSet &, + const ParticipantsSet &) = default; +}; + struct Block { QByteArray data; }; @@ -52,6 +69,7 @@ enum class CallFailure { class Call final { public: explicit Call(UserId myUserId); + ~Call(); [[nodiscard]] PublicKey myKey() const; @@ -69,11 +87,19 @@ public: [[nodiscard]] rpl::producer subchainRequests() const; void subchainBlocksRequestFinished(int subchain); + [[nodiscard]] rpl::producer sendOutboundBlock() const; + [[nodiscard]] std::optional failed() const; [[nodiscard]] rpl::producer failures() const; + [[nodiscard]] QByteArray emojiHash() const; + [[nodiscard]] rpl::producer emojiHashValue() const; + void refreshLastBlock0(std::optional block); [[nodiscard]] Block makeJoinBlock(); + [[nodiscard]] Block makeRemoveBlock(UserId id); + + [[nodiscard]] rpl::producer participantsSetValue() const; [[nodiscard]] std::vector encrypt( const std::vector &data) const; @@ -92,12 +118,15 @@ private: int height = 0; }; - void apply(const Block &last); + void apply(int subchain, const Block &last); void fail(CallFailure reason); + void checkForOutboundMessages(); void checkWaitingBlocks(int subchain, bool waited = false); void shortPoll(int subchain); + [[nodiscard]] std::int64_t libId() const; + CallId _id; UserId _myUserId; PrivateKeyId _myKeyId; @@ -107,10 +136,14 @@ private: SubChainState _subchains[kSubChainsCount]; rpl::event_stream _subchainRequests; + rpl::event_stream _outboundBlocks; std::optional _lastBlock0; int _lastBlock0Height = 0; + rpl::variable _participantsSet; + rpl::variable _emojiHash; + }; } // namespace TdE2E