diff --git a/Telegram/SourceFiles/calls/calls_choose_join_as.cpp b/Telegram/SourceFiles/calls/calls_choose_join_as.cpp index 39e4fa8f1..ba8de48ea 100644 --- a/Telegram/SourceFiles/calls/calls_choose_join_as.cpp +++ b/Telegram/SourceFiles/calls/calls_choose_join_as.cpp @@ -226,9 +226,14 @@ void ChooseJoinAsProcess::start( return list; }); if (list.empty()) { + // #TODO calls in case of anonymous group admin show lng_group_call_no_anonymous _request->showToast("No way to join this voice chat :("); return; - } else if (list.size() == 1 && list.front() == self) { + } else if (list.size() == 1 + && list.front() == self + && (!peer->isChannel() + || !peer->asChannel()->amAnonymous() + || (peer->isBroadcast() && !peer->canWrite()))) { info.possibleJoinAs = std::move(list); finish(info); return; diff --git a/Telegram/SourceFiles/calls/calls_group_call.cpp b/Telegram/SourceFiles/calls/calls_group_call.cpp index aa6d15059..c542fa9fb 100644 --- a/Telegram/SourceFiles/calls/calls_group_call.cpp +++ b/Telegram/SourceFiles/calls/calls_group_call.cpp @@ -70,6 +70,28 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000); } // namespace +class GroupCall::LoadPartTask final : public tgcalls::BroadcastPartTask { +public: + LoadPartTask( + base::weak_ptr call, + TimeId date, + Fn done); + + [[nodiscard]] TimeId date() const { + return _date; + } + + void done(tgcalls::BroadcastPart &&part); + void cancel() override; + +private: + const base::weak_ptr _call; + const TimeId _date = 0; + Fn _done; + QMutex _mutex; + +}; + [[nodiscard]] bool IsGroupCallAdmin( not_null peer, not_null participantPeer) { @@ -80,7 +102,7 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000); if (const auto chat = peer->asChat()) { return chat->admins.contains(user) || (chat->creator == user->bareId()); - } else if (const auto group = peer->asMegagroup()) { + } else if (const auto group = peer->asChannel()) { if (const auto mgInfo = group->mgInfo.get()) { if (mgInfo->creator == user) { return true; @@ -96,6 +118,40 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000); return false; } +GroupCall::LoadPartTask::LoadPartTask( + base::weak_ptr call, + TimeId date, + Fn done) +: _call(std::move(call)) +, _date(date ? date : base::unixtime::now()) +, _done(std::move(done)) { +} + +void GroupCall::LoadPartTask::done(tgcalls::BroadcastPart &&part) { + QMutexLocker lock(&_mutex); + if (_done) { + base::take(_done)(std::move(part)); + } +} + +void GroupCall::LoadPartTask::cancel() { + QMutexLocker lock(&_mutex); + if (!_done) { + return; + } + _done = nullptr; + lock.unlock(); + + if (_call) { + const auto that = this; + crl::on_main(_call, [weak = _call, that] { + if (const auto strong = weak.get()) { + strong->broadcastPartCancel(that); + } + }); + } +} + GroupCall::GroupCall( not_null delegate, Group::JoinInfo info, @@ -264,7 +320,7 @@ void GroupCall::join(const MTPInputGroupCall &inputCall) { setState(State::Joining); if (const auto chat = _peer->asChat()) { chat->setGroupCall(inputCall); - } else if (const auto group = _peer->asMegagroup()) { + } else if (const auto group = _peer->asChannel()) { group->setGroupCall(inputCall); } else { Unexpected("Peer type in GroupCall::join."); @@ -316,7 +372,8 @@ void GroupCall::rejoin() { _mySsrc = 0; setState(State::Joining); - createAndStartController(); + ensureControllerCreated(); + setInstanceMode(InstanceMode::None); applyMeInCallLocally(); LOG(("Call Info: Requesting join payload.")); @@ -581,8 +638,11 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) { || !_instance) { return; } + const auto streamDcId = MTP::BareDcId( + data.vstream_dc_id().value_or_empty()); if (const auto params = data.vparams()) { params->match([&](const MTPDdataJSON &data) { + using ConnectionMode = tgcalls::GroupConnectionMode; auto error = QJsonParseError{ 0, QJsonParseError::NoError }; const auto document = QJsonDocument::fromJson( data.vdata().v, @@ -597,6 +657,22 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) { "Not an object received in group call params.")); return; } + + const auto guard = gsl::finally([&] { + addParticipantsToInstance(); + }); + + if (document.object().value("stream").toBool()) { + if (!streamDcId) { + LOG(("Api Error: Empty stream_dc_id in groupCall.")); + } + _broadcastDcId = streamDcId + ? streamDcId + : _peer->session().mtp().mainDcId(); + setInstanceMode(InstanceMode::Stream); + return; + } + const auto readString = []( const QJsonObject &object, const char *key) { @@ -634,12 +710,8 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) { .relPort = readString(object, "relPort"), }); } - _instance->setConnectionMode( - tgcalls::GroupConnectionMode::GroupConnectionModeRtc); + setInstanceMode(InstanceMode::Rtc); _instance->setJoinResponsePayload(payload, {}); - _instancePayloadsDone = true; - - addParticipantsToInstance(); }); } }, [&](const MTPDgroupCallDiscarded &data) { @@ -652,7 +724,9 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) { void GroupCall::addParticipantsToInstance() { const auto real = _peer->groupCall(); - if (!real || (real->id() != _id) || !_instancePayloadsDone) { + if (!real + || (real->id() != _id) + || (_instanceMode == InstanceMode::None)) { return; } for (const auto &participant : real->participants()) { @@ -719,7 +793,10 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallParticipants &data) { for (const auto &participant : data.vparticipants().v) { participant.match([&](const MTPDgroupCallParticipant &data) { - if (!data.is_self()) { + const auto isSelf = data.is_self() + || (data.is_min() + && peerFromMTP(data.vpeer()) == _joinAs->id); + if (!isSelf) { handleOtherParticipants(data); return; } @@ -738,6 +815,10 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallParticipants &data) { } if (data.is_muted() && !data.is_can_self_unmute()) { setMuted(MuteState::ForceMuted); + } else if (_instanceMode == InstanceMode::Stream) { + LOG(("Call Info: Rejoin after unforcemute in stream mode.")); + setState(State::Joining); + rejoin(); } else if (muted() == MuteState::ForceMuted) { setMuted(MuteState::Muted); } else if (data.is_muted() && muted() != MuteState::Muted) { @@ -763,7 +844,10 @@ void GroupCall::changeTitle(const QString &title) { }).send(); } -void GroupCall::createAndStartController() { +void GroupCall::ensureControllerCreated() { + if (_instance) { + return; + } const auto &settings = Core::App().settings(); const auto weak = base::make_weak(this); @@ -799,6 +883,18 @@ void GroupCall::createAndStartController() { requestParticipantsInformation(ssrcs); }); }, + .requestBroadcastPart = [=]( + int32_t date, + std::function done) { + auto result = std::make_shared( + weak, + date, + std::move(done)); + crl::on_main(weak, [=]() mutable { + broadcastPartStart(std::move(result)); + }); + return result; + } }; if (Logs::DebugEnabled()) { auto callLogFolder = cWorkingDir() + qsl("DebugLogs"); @@ -816,7 +912,6 @@ void GroupCall::createAndStartController() { } LOG(("Call Info: Creating group instance")); - _instancePayloadsDone = false; _instance = std::make_unique( std::move(descriptor)); @@ -826,10 +921,70 @@ void GroupCall::createAndStartController() { //raw->setAudioOutputDuckingEnabled(settings.callAudioDuckingEnabled()); } +void GroupCall::broadcastPartStart(std::shared_ptr task) { + const auto raw = task.get(); + const auto date = raw->date(); + const auto finish = [=](tgcalls::BroadcastPart &&part) { + raw->done(std::move(part)); + _broadcastParts.erase(raw); + }; + using Status = tgcalls::BroadcastPart::Status; + const auto requestId = _api.request(MTPupload_GetFile( + MTP_flags(0), + MTP_inputGroupCallStream( + inputCall(), + MTP_long(uint64(date) * 1000), + MTP_int(0)), + MTP_int(0), + MTP_int(128 * 1024) + )).done([=](const MTPupload_File &result) { + result.match([&](const MTPDupload_file &data) { + const auto size = data.vbytes().v.size(); + auto bytes = std::vector(size); + memcpy(bytes.data(), data.vbytes().v.constData(), size); + finish({ + .timestamp = date, + .responseTimestamp = date + 1., // #TODO calls extract from mtproto + .status = Status::Success, + .oggData = std::move(bytes), + }); + }, [&](const MTPDupload_fileCdnRedirect &data) { + LOG(("Voice Chat Stream Error: fileCdnRedirect received.")); + finish({ + .timestamp = date, + .responseTimestamp = date + 1., // #TODO calls extract from mtproto + .status = Status::TooOld, + }); + }); + }).fail([=](const RPCError &error) { + const auto status = MTP::isTemporaryError(error) + ? Status::NotReady + : Status::TooOld; + finish({ + .timestamp = date, + .responseTimestamp = date + 1., // #TODO calls extract from mtproto + .status = status, + }); + }).handleAllErrors().toDC( + MTP::groupCallStreamDcId(_broadcastDcId) + ).send(); + _broadcastParts.emplace(raw, LoadingPart{ std::move(task), requestId }); +} + +void GroupCall::broadcastPartCancel(not_null task) { + const auto i = _broadcastParts.find(task); + if (i != _broadcastParts.end()) { + _api.request(i->second.requestId).cancel(); + _broadcastParts.erase(i); + } +} + void GroupCall::requestParticipantsInformation( const std::vector &ssrcs) { const auto real = _peer->groupCall(); - if (!real || real->id() != _id || !_instancePayloadsDone) { + if (!real + || (real->id() != _id) + || (_instanceMode == InstanceMode::None)) { for (const auto ssrc : ssrcs) { _unresolvedSsrcs.emplace(ssrc); } @@ -1000,6 +1155,22 @@ void GroupCall::setInstanceConnected(bool connected) { } } +void GroupCall::setInstanceMode(InstanceMode mode) { + Expects(_instance != nullptr); + + _instanceMode = mode; + + using Mode = tgcalls::GroupConnectionMode; + _instance->setConnectionMode([&] { + switch (_instanceMode) { + case InstanceMode::None: return Mode::GroupConnectionModeNone; + case InstanceMode::Rtc: return Mode::GroupConnectionModeRtc; + case InstanceMode::Stream: return Mode::GroupConnectionModeBroadcast; + } + Unexpected("Mode in GroupCall::setInstanceMode."); + }()); +} + void GroupCall::maybeSendMutedUpdate(MuteState previous) { // Send only Active <-> !Active changes. const auto now = muted(); diff --git a/Telegram/SourceFiles/calls/calls_group_call.h b/Telegram/SourceFiles/calls/calls_group_call.h index 68a6d6a07..d6f91aa81 100644 --- a/Telegram/SourceFiles/calls/calls_group_call.h +++ b/Telegram/SourceFiles/calls/calls_group_call.h @@ -174,18 +174,35 @@ public: return _lifetime; } +private: + class LoadPartTask; + +public: + void broadcastPartStart(std::shared_ptr task); + void broadcastPartCancel(not_null task); + private: using GlobalShortcutValue = base::GlobalShortcutValue; + struct LoadingPart { + std::shared_ptr task; + mtpRequestId requestId = 0; + }; + enum class FinishType { None, Ended, Failed, }; + enum class InstanceMode { + None, + Rtc, + Stream, + }; void handleRequestError(const RPCError &error); void handleControllerError(const QString &error); - void createAndStartController(); + void ensureControllerCreated(); void destroyController(); void setState(State state); @@ -199,6 +216,7 @@ private: void audioLevelsUpdated(const tgcalls::GroupLevelsUpdate &data); void setInstanceConnected(bool connected); + void setInstanceMode(InstanceMode mode); void checkLastSpoke(); void pushToTalkCancel(); @@ -233,11 +251,14 @@ private: MTP::Sender _api; rpl::variable _state = State::Creating; bool _instanceConnected = false; - bool _instancePayloadsDone = false; + InstanceMode _instanceMode = InstanceMode::None; base::flat_set _unresolvedSsrcs; std::vector _preparedParticipants; bool _addPreparedParticipantsScheduled = false; + MTP::DcId _broadcastDcId = 0; + base::flat_map, LoadingPart> _broadcastParts; + not_null _joinAs; std::vector> _possibleJoinAs; diff --git a/Telegram/SourceFiles/data/data_peer.cpp b/Telegram/SourceFiles/data/data_peer.cpp index ec9ccb379..511ccde6f 100644 --- a/Telegram/SourceFiles/data/data_peer.cpp +++ b/Telegram/SourceFiles/data/data_peer.cpp @@ -904,7 +904,7 @@ bool PeerData::canManageGroupCall() const { if (const auto chat = asChat()) { return chat->amCreator() || (chat->adminRights() & ChatAdminRight::f_manage_call); - } else if (const auto group = asMegagroup()) { + } else if (const auto group = asChannel()) { return group->amCreator() || (group->adminRights() & ChatAdminRight::f_manage_call); } @@ -914,7 +914,7 @@ bool PeerData::canManageGroupCall() const { Data::GroupCall *PeerData::groupCall() const { if (const auto chat = asChat()) { return chat->groupCall(); - } else if (const auto group = asMegagroup()) { + } else if (const auto group = asChannel()) { return group->groupCall(); } return nullptr; @@ -923,7 +923,7 @@ Data::GroupCall *PeerData::groupCall() const { PeerId PeerData::groupCallDefaultJoinAs() const { if (const auto chat = asChat()) { return chat->groupCallDefaultJoinAs(); - } else if (const auto group = asMegagroup()) { + } else if (const auto group = asChannel()) { return group->groupCallDefaultJoinAs(); } return 0; diff --git a/Telegram/SourceFiles/history/history_widget.cpp b/Telegram/SourceFiles/history/history_widget.cpp index 1e5c5e43d..2d0d6b8a5 100644 --- a/Telegram/SourceFiles/history/history_widget.cpp +++ b/Telegram/SourceFiles/history/history_widget.cpp @@ -5589,7 +5589,7 @@ void HistoryWidget::setupGroupCallTracker() { Expects(_history != nullptr); const auto peer = _history->peer; - if (!peer->asMegagroup() && !peer->asChat()) { + if (!peer->isChannel() && !peer->isChat()) { _groupCallTracker = nullptr; _groupCallBar = nullptr; return; @@ -5622,12 +5622,12 @@ void HistoryWidget::setupGroupCallTracker() { ) | rpl::start_with_next([=] { const auto peer = _history->peer; const auto channel = peer->asChannel(); - if (channel && channel->amAnonymous()) { + /*if (channel && channel->amAnonymous()) { // #TODO calls Ui::ShowMultilineToast({ .text = { tr::lng_group_call_no_anonymous(tr::now) }, }); return; - } else if (peer->groupCall()) { + } else */if (peer->groupCall()) { controller()->startOrJoinGroupCall(peer); } }, _groupCallBar->lifetime()); diff --git a/Telegram/SourceFiles/mtproto/core_types.h b/Telegram/SourceFiles/mtproto/core_types.h index 93724f611..18b54ffa0 100644 --- a/Telegram/SourceFiles/mtproto/core_types.h +++ b/Telegram/SourceFiles/mtproto/core_types.h @@ -42,6 +42,7 @@ constexpr auto kLogoutDcShift = 0x02; constexpr auto kUpdaterDcShift = 0x03; constexpr auto kExportDcShift = 0x04; constexpr auto kExportMediaDcShift = 0x05; +constexpr auto kGroupCallStreamDcShift = 0x06; constexpr auto kMaxMediaDcCount = 0x10; constexpr auto kBaseDownloadDcShift = 0x10; constexpr auto kBaseUploadDcShift = 0x20; diff --git a/Telegram/SourceFiles/mtproto/facade.h b/Telegram/SourceFiles/mtproto/facade.h index c0892b3cb..ca6e245c9 100644 --- a/Telegram/SourceFiles/mtproto/facade.h +++ b/Telegram/SourceFiles/mtproto/facade.h @@ -35,6 +35,11 @@ constexpr ShiftedDcId updaterDcId(DcId dcId) { return ShiftDcId(dcId, kUpdaterDcShift); } +// send(MTPupload_GetFile(), MTP::groupCallStreamDcId(dc)) - for gorup call stream +constexpr ShiftedDcId groupCallStreamDcId(DcId dcId) { + return ShiftDcId(dcId, kGroupCallStreamDcShift); +} + constexpr auto kUploadSessionsCount = 2; namespace details { diff --git a/Telegram/SourceFiles/window/window_session_controller.cpp b/Telegram/SourceFiles/window/window_session_controller.cpp index 3d642a79e..c048c659a 100644 --- a/Telegram/SourceFiles/window/window_session_controller.cpp +++ b/Telegram/SourceFiles/window/window_session_controller.cpp @@ -934,13 +934,13 @@ void SessionController::closeThirdSection() { void SessionController::startOrJoinGroupCall( not_null peer, bool confirmedLeaveOther) { - const auto channel = peer->asChannel(); - if (channel && channel->amAnonymous()) { - Ui::ShowMultilineToast({ - .text = { tr::lng_group_call_no_anonymous(tr::now) }, - }); - return; - } + //const auto channel = peer->asChannel(); // #TODO calls + //if (channel && channel->amAnonymous()) { + // Ui::ShowMultilineToast({ + // .text = { tr::lng_group_call_no_anonymous(tr::now) }, + // }); + // return; + //} auto &calls = Core::App().calls(); const auto confirm = [&](QString text, QString button) { Ui::show(Box(text, button, crl::guard(this, [=] {