Support broadcast mode in voice chats.

This commit is contained in:
John Preston 2021-03-08 18:39:36 +04:00
parent ccd440ea0b
commit f66d7088ef
8 changed files with 232 additions and 29 deletions

View file

@ -226,9 +226,14 @@ void ChooseJoinAsProcess::start(
return list;
});
if (list.empty()) {
// #TODO calls in case of anonymous group admin show lng_group_call_no_anonymous
_request->showToast("No way to join this voice chat :(");
return;
} else if (list.size() == 1 && list.front() == self) {
} else if (list.size() == 1
&& list.front() == self
&& (!peer->isChannel()
|| !peer->asChannel()->amAnonymous()
|| (peer->isBroadcast() && !peer->canWrite()))) {
info.possibleJoinAs = std::move(list);
finish(info);
return;

View file

@ -70,6 +70,28 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000);
} // namespace
class GroupCall::LoadPartTask final : public tgcalls::BroadcastPartTask {
public:
LoadPartTask(
base::weak_ptr<GroupCall> call,
TimeId date,
Fn<void(tgcalls::BroadcastPart&&)> done);
[[nodiscard]] TimeId date() const {
return _date;
}
void done(tgcalls::BroadcastPart &&part);
void cancel() override;
private:
const base::weak_ptr<GroupCall> _call;
const TimeId _date = 0;
Fn<void(tgcalls::BroadcastPart &&)> _done;
QMutex _mutex;
};
[[nodiscard]] bool IsGroupCallAdmin(
not_null<PeerData*> peer,
not_null<PeerData*> participantPeer) {
@ -80,7 +102,7 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000);
if (const auto chat = peer->asChat()) {
return chat->admins.contains(user)
|| (chat->creator == user->bareId());
} else if (const auto group = peer->asMegagroup()) {
} else if (const auto group = peer->asChannel()) {
if (const auto mgInfo = group->mgInfo.get()) {
if (mgInfo->creator == user) {
return true;
@ -96,6 +118,40 @@ constexpr auto kPlayConnectingEach = crl::time(1056) + 2 * crl::time(1000);
return false;
}
GroupCall::LoadPartTask::LoadPartTask(
base::weak_ptr<GroupCall> call,
TimeId date,
Fn<void(tgcalls::BroadcastPart &&)> done)
: _call(std::move(call))
, _date(date ? date : base::unixtime::now())
, _done(std::move(done)) {
}
void GroupCall::LoadPartTask::done(tgcalls::BroadcastPart &&part) {
QMutexLocker lock(&_mutex);
if (_done) {
base::take(_done)(std::move(part));
}
}
void GroupCall::LoadPartTask::cancel() {
QMutexLocker lock(&_mutex);
if (!_done) {
return;
}
_done = nullptr;
lock.unlock();
if (_call) {
const auto that = this;
crl::on_main(_call, [weak = _call, that] {
if (const auto strong = weak.get()) {
strong->broadcastPartCancel(that);
}
});
}
}
GroupCall::GroupCall(
not_null<Delegate*> delegate,
Group::JoinInfo info,
@ -264,7 +320,7 @@ void GroupCall::join(const MTPInputGroupCall &inputCall) {
setState(State::Joining);
if (const auto chat = _peer->asChat()) {
chat->setGroupCall(inputCall);
} else if (const auto group = _peer->asMegagroup()) {
} else if (const auto group = _peer->asChannel()) {
group->setGroupCall(inputCall);
} else {
Unexpected("Peer type in GroupCall::join.");
@ -316,7 +372,8 @@ void GroupCall::rejoin() {
_mySsrc = 0;
setState(State::Joining);
createAndStartController();
ensureControllerCreated();
setInstanceMode(InstanceMode::None);
applyMeInCallLocally();
LOG(("Call Info: Requesting join payload."));
@ -581,8 +638,11 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) {
|| !_instance) {
return;
}
const auto streamDcId = MTP::BareDcId(
data.vstream_dc_id().value_or_empty());
if (const auto params = data.vparams()) {
params->match([&](const MTPDdataJSON &data) {
using ConnectionMode = tgcalls::GroupConnectionMode;
auto error = QJsonParseError{ 0, QJsonParseError::NoError };
const auto document = QJsonDocument::fromJson(
data.vdata().v,
@ -597,6 +657,22 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) {
"Not an object received in group call params."));
return;
}
const auto guard = gsl::finally([&] {
addParticipantsToInstance();
});
if (document.object().value("stream").toBool()) {
if (!streamDcId) {
LOG(("Api Error: Empty stream_dc_id in groupCall."));
}
_broadcastDcId = streamDcId
? streamDcId
: _peer->session().mtp().mainDcId();
setInstanceMode(InstanceMode::Stream);
return;
}
const auto readString = [](
const QJsonObject &object,
const char *key) {
@ -634,12 +710,8 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) {
.relPort = readString(object, "relPort"),
});
}
_instance->setConnectionMode(
tgcalls::GroupConnectionMode::GroupConnectionModeRtc);
setInstanceMode(InstanceMode::Rtc);
_instance->setJoinResponsePayload(payload, {});
_instancePayloadsDone = true;
addParticipantsToInstance();
});
}
}, [&](const MTPDgroupCallDiscarded &data) {
@ -652,7 +724,9 @@ void GroupCall::handleUpdate(const MTPGroupCall &call) {
void GroupCall::addParticipantsToInstance() {
const auto real = _peer->groupCall();
if (!real || (real->id() != _id) || !_instancePayloadsDone) {
if (!real
|| (real->id() != _id)
|| (_instanceMode == InstanceMode::None)) {
return;
}
for (const auto &participant : real->participants()) {
@ -719,7 +793,10 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallParticipants &data) {
for (const auto &participant : data.vparticipants().v) {
participant.match([&](const MTPDgroupCallParticipant &data) {
if (!data.is_self()) {
const auto isSelf = data.is_self()
|| (data.is_min()
&& peerFromMTP(data.vpeer()) == _joinAs->id);
if (!isSelf) {
handleOtherParticipants(data);
return;
}
@ -738,6 +815,10 @@ void GroupCall::handleUpdate(const MTPDupdateGroupCallParticipants &data) {
}
if (data.is_muted() && !data.is_can_self_unmute()) {
setMuted(MuteState::ForceMuted);
} else if (_instanceMode == InstanceMode::Stream) {
LOG(("Call Info: Rejoin after unforcemute in stream mode."));
setState(State::Joining);
rejoin();
} else if (muted() == MuteState::ForceMuted) {
setMuted(MuteState::Muted);
} else if (data.is_muted() && muted() != MuteState::Muted) {
@ -763,7 +844,10 @@ void GroupCall::changeTitle(const QString &title) {
}).send();
}
void GroupCall::createAndStartController() {
void GroupCall::ensureControllerCreated() {
if (_instance) {
return;
}
const auto &settings = Core::App().settings();
const auto weak = base::make_weak(this);
@ -799,6 +883,18 @@ void GroupCall::createAndStartController() {
requestParticipantsInformation(ssrcs);
});
},
.requestBroadcastPart = [=](
int32_t date,
std::function<void(tgcalls::BroadcastPart &&)> done) {
auto result = std::make_shared<LoadPartTask>(
weak,
date,
std::move(done));
crl::on_main(weak, [=]() mutable {
broadcastPartStart(std::move(result));
});
return result;
}
};
if (Logs::DebugEnabled()) {
auto callLogFolder = cWorkingDir() + qsl("DebugLogs");
@ -816,7 +912,6 @@ void GroupCall::createAndStartController() {
}
LOG(("Call Info: Creating group instance"));
_instancePayloadsDone = false;
_instance = std::make_unique<tgcalls::GroupInstanceCustomImpl>(
std::move(descriptor));
@ -826,10 +921,70 @@ void GroupCall::createAndStartController() {
//raw->setAudioOutputDuckingEnabled(settings.callAudioDuckingEnabled());
}
void GroupCall::broadcastPartStart(std::shared_ptr<LoadPartTask> task) {
const auto raw = task.get();
const auto date = raw->date();
const auto finish = [=](tgcalls::BroadcastPart &&part) {
raw->done(std::move(part));
_broadcastParts.erase(raw);
};
using Status = tgcalls::BroadcastPart::Status;
const auto requestId = _api.request(MTPupload_GetFile(
MTP_flags(0),
MTP_inputGroupCallStream(
inputCall(),
MTP_long(uint64(date) * 1000),
MTP_int(0)),
MTP_int(0),
MTP_int(128 * 1024)
)).done([=](const MTPupload_File &result) {
result.match([&](const MTPDupload_file &data) {
const auto size = data.vbytes().v.size();
auto bytes = std::vector<uint8_t>(size);
memcpy(bytes.data(), data.vbytes().v.constData(), size);
finish({
.timestamp = date,
.responseTimestamp = date + 1., // #TODO calls extract from mtproto
.status = Status::Success,
.oggData = std::move(bytes),
});
}, [&](const MTPDupload_fileCdnRedirect &data) {
LOG(("Voice Chat Stream Error: fileCdnRedirect received."));
finish({
.timestamp = date,
.responseTimestamp = date + 1., // #TODO calls extract from mtproto
.status = Status::TooOld,
});
});
}).fail([=](const RPCError &error) {
const auto status = MTP::isTemporaryError(error)
? Status::NotReady
: Status::TooOld;
finish({
.timestamp = date,
.responseTimestamp = date + 1., // #TODO calls extract from mtproto
.status = status,
});
}).handleAllErrors().toDC(
MTP::groupCallStreamDcId(_broadcastDcId)
).send();
_broadcastParts.emplace(raw, LoadingPart{ std::move(task), requestId });
}
void GroupCall::broadcastPartCancel(not_null<LoadPartTask*> task) {
const auto i = _broadcastParts.find(task);
if (i != _broadcastParts.end()) {
_api.request(i->second.requestId).cancel();
_broadcastParts.erase(i);
}
}
void GroupCall::requestParticipantsInformation(
const std::vector<uint32_t> &ssrcs) {
const auto real = _peer->groupCall();
if (!real || real->id() != _id || !_instancePayloadsDone) {
if (!real
|| (real->id() != _id)
|| (_instanceMode == InstanceMode::None)) {
for (const auto ssrc : ssrcs) {
_unresolvedSsrcs.emplace(ssrc);
}
@ -1000,6 +1155,22 @@ void GroupCall::setInstanceConnected(bool connected) {
}
}
void GroupCall::setInstanceMode(InstanceMode mode) {
Expects(_instance != nullptr);
_instanceMode = mode;
using Mode = tgcalls::GroupConnectionMode;
_instance->setConnectionMode([&] {
switch (_instanceMode) {
case InstanceMode::None: return Mode::GroupConnectionModeNone;
case InstanceMode::Rtc: return Mode::GroupConnectionModeRtc;
case InstanceMode::Stream: return Mode::GroupConnectionModeBroadcast;
}
Unexpected("Mode in GroupCall::setInstanceMode.");
}());
}
void GroupCall::maybeSendMutedUpdate(MuteState previous) {
// Send only Active <-> !Active changes.
const auto now = muted();

View file

@ -174,18 +174,35 @@ public:
return _lifetime;
}
private:
class LoadPartTask;
public:
void broadcastPartStart(std::shared_ptr<LoadPartTask> task);
void broadcastPartCancel(not_null<LoadPartTask*> task);
private:
using GlobalShortcutValue = base::GlobalShortcutValue;
struct LoadingPart {
std::shared_ptr<LoadPartTask> task;
mtpRequestId requestId = 0;
};
enum class FinishType {
None,
Ended,
Failed,
};
enum class InstanceMode {
None,
Rtc,
Stream,
};
void handleRequestError(const RPCError &error);
void handleControllerError(const QString &error);
void createAndStartController();
void ensureControllerCreated();
void destroyController();
void setState(State state);
@ -199,6 +216,7 @@ private:
void audioLevelsUpdated(const tgcalls::GroupLevelsUpdate &data);
void setInstanceConnected(bool connected);
void setInstanceMode(InstanceMode mode);
void checkLastSpoke();
void pushToTalkCancel();
@ -233,11 +251,14 @@ private:
MTP::Sender _api;
rpl::variable<State> _state = State::Creating;
bool _instanceConnected = false;
bool _instancePayloadsDone = false;
InstanceMode _instanceMode = InstanceMode::None;
base::flat_set<uint32> _unresolvedSsrcs;
std::vector<tgcalls::GroupParticipantDescription> _preparedParticipants;
bool _addPreparedParticipantsScheduled = false;
MTP::DcId _broadcastDcId = 0;
base::flat_map<not_null<LoadPartTask*>, LoadingPart> _broadcastParts;
not_null<PeerData*> _joinAs;
std::vector<not_null<PeerData*>> _possibleJoinAs;

View file

@ -904,7 +904,7 @@ bool PeerData::canManageGroupCall() const {
if (const auto chat = asChat()) {
return chat->amCreator()
|| (chat->adminRights() & ChatAdminRight::f_manage_call);
} else if (const auto group = asMegagroup()) {
} else if (const auto group = asChannel()) {
return group->amCreator()
|| (group->adminRights() & ChatAdminRight::f_manage_call);
}
@ -914,7 +914,7 @@ bool PeerData::canManageGroupCall() const {
Data::GroupCall *PeerData::groupCall() const {
if (const auto chat = asChat()) {
return chat->groupCall();
} else if (const auto group = asMegagroup()) {
} else if (const auto group = asChannel()) {
return group->groupCall();
}
return nullptr;
@ -923,7 +923,7 @@ Data::GroupCall *PeerData::groupCall() const {
PeerId PeerData::groupCallDefaultJoinAs() const {
if (const auto chat = asChat()) {
return chat->groupCallDefaultJoinAs();
} else if (const auto group = asMegagroup()) {
} else if (const auto group = asChannel()) {
return group->groupCallDefaultJoinAs();
}
return 0;

View file

@ -5589,7 +5589,7 @@ void HistoryWidget::setupGroupCallTracker() {
Expects(_history != nullptr);
const auto peer = _history->peer;
if (!peer->asMegagroup() && !peer->asChat()) {
if (!peer->isChannel() && !peer->isChat()) {
_groupCallTracker = nullptr;
_groupCallBar = nullptr;
return;
@ -5622,12 +5622,12 @@ void HistoryWidget::setupGroupCallTracker() {
) | rpl::start_with_next([=] {
const auto peer = _history->peer;
const auto channel = peer->asChannel();
if (channel && channel->amAnonymous()) {
/*if (channel && channel->amAnonymous()) { // #TODO calls
Ui::ShowMultilineToast({
.text = { tr::lng_group_call_no_anonymous(tr::now) },
});
return;
} else if (peer->groupCall()) {
} else */if (peer->groupCall()) {
controller()->startOrJoinGroupCall(peer);
}
}, _groupCallBar->lifetime());

View file

@ -42,6 +42,7 @@ constexpr auto kLogoutDcShift = 0x02;
constexpr auto kUpdaterDcShift = 0x03;
constexpr auto kExportDcShift = 0x04;
constexpr auto kExportMediaDcShift = 0x05;
constexpr auto kGroupCallStreamDcShift = 0x06;
constexpr auto kMaxMediaDcCount = 0x10;
constexpr auto kBaseDownloadDcShift = 0x10;
constexpr auto kBaseUploadDcShift = 0x20;

View file

@ -35,6 +35,11 @@ constexpr ShiftedDcId updaterDcId(DcId dcId) {
return ShiftDcId(dcId, kUpdaterDcShift);
}
// send(MTPupload_GetFile(), MTP::groupCallStreamDcId(dc)) - for gorup call stream
constexpr ShiftedDcId groupCallStreamDcId(DcId dcId) {
return ShiftDcId(dcId, kGroupCallStreamDcShift);
}
constexpr auto kUploadSessionsCount = 2;
namespace details {

View file

@ -934,13 +934,13 @@ void SessionController::closeThirdSection() {
void SessionController::startOrJoinGroupCall(
not_null<PeerData*> peer,
bool confirmedLeaveOther) {
const auto channel = peer->asChannel();
if (channel && channel->amAnonymous()) {
Ui::ShowMultilineToast({
.text = { tr::lng_group_call_no_anonymous(tr::now) },
});
return;
}
//const auto channel = peer->asChannel(); // #TODO calls
//if (channel && channel->amAnonymous()) {
// Ui::ShowMultilineToast({
// .text = { tr::lng_group_call_no_anonymous(tr::now) },
// });
// return;
//}
auto &calls = Core::App().calls();
const auto confirm = [&](QString text, QString button) {
Ui::show(Box<ConfirmBox>(text, button, crl::guard(this, [=] {