From 8e2688b3dbdbcd0089102c794f6ebaeae573413c Mon Sep 17 00:00:00 2001 From: Robert Long Date: Fri, 6 Aug 2021 14:56:14 -0700 Subject: [PATCH] Refactor the ConferenceCallManager class --- src/ConferenceCallDebugger.js | 39 +- src/ConferenceCallManager.js | 609 +++++++++++++----------------- src/ConferenceCallManagerHooks.js | 4 +- src/DevTools.jsx | 18 +- src/DevTools.module.css | 16 + src/Room.jsx | 4 +- src/Room.module.css | 1 - 7 files changed, 325 insertions(+), 366 deletions(-) diff --git a/src/ConferenceCallDebugger.js b/src/ConferenceCallDebugger.js index c95511d5..3ef9f4de 100644 --- a/src/ConferenceCallDebugger.js +++ b/src/ConferenceCallDebugger.js @@ -1,3 +1,19 @@ +/* +Copyright 2021 New Vector Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + import EventEmitter from "events"; export class ConferenceCallDebugger extends EventEmitter { @@ -11,17 +27,34 @@ export class ConferenceCallDebugger extends EventEmitter { calls: new Map(), }; + this.bufferedEvents = []; + this.manager.on("call", this._onCall); this.manager.on("debugstate", this._onDebugStateChanged); this.manager.client.on("event", this._onEvent); + this.manager.on("entered", this._onEntered); } + _onEntered = () => { + const eventCount = this.bufferedEvents.length; + + for (let i = 0; i < eventCount; i++) { + const event = this.bufferedEvents.pop(); + this._onEvent(event); + } + }; + _onEvent = (event) => { + if (!this.manager.entered) { + this.bufferedEvents.push(event); + return; + } + const roomId = event.getRoomId(); const type = event.getType(); if ( - roomId === this.manager.roomId && + roomId === this.manager.room.roomId && (type.startsWith("m.call.") || type === "me.robertlong.call.info") ) { const sender = event.getSender(); @@ -332,7 +365,7 @@ export class ConferenceCallDebugger extends EventEmitter { .map(processRemoteOutboundRTPStats); this.manager.client.sendEvent( - this.manager.roomId, + this.manager.room.roomId, "me.robertlong.call.info", event ); @@ -373,7 +406,7 @@ export class ConferenceCallDebugger extends EventEmitter { "icecandidateerror", ({ errorCode, url, errorText }) => { this.manager.client.sendEvent( - this.manager.roomId, + this.manager.room.roomId, "me.robertlong.call.ice_error", { call_id: call.callId, diff --git a/src/ConferenceCallManager.js b/src/ConferenceCallManager.js index 70ff4d35..05e3ff52 100644 --- a/src/ConferenceCallManager.js +++ b/src/ConferenceCallManager.js @@ -131,343 +131,6 @@ export class ConferenceCallManager extends EventEmitter { } } - constructor(client) { - super(); - this.client = client; - this.joined = false; - this.room = null; - const localUserId = client.getUserId(); - this.localParticipant = { - local: true, - userId: localUserId, - stream: null, - call: null, - muted: true, - }; - this.participants = [this.localParticipant]; - this.pendingCalls = []; - - this.client.on("RoomState.members", this._onMemberChanged); - this.client.on("Call.incoming", this._onIncomingCall); - this.callDebugger = new ConferenceCallDebugger(this); - } - - setRoom(roomId) { - this.roomId = roomId; - this.room = this.client.getRoom(this.roomId); - } - - async join() { - const mediaStream = await this.client.getLocalVideoStream(); - - this.localParticipant.stream = mediaStream; - - this.joined = true; - - this.emit("debugstate", this.client.getUserId(), null, "you"); - - const activeConf = this.room.currentState - .getStateEvents(CONF_ROOM, "") - ?.getContent()?.active; - - if (!activeConf) { - this.client.sendStateEvent(this.roomId, CONF_ROOM, { active: true }, ""); - } - - const roomMemberIds = this.room.getMembers().map(({ userId }) => userId); - - roomMemberIds.forEach((userId) => { - this._processMember(userId); - }); - - for (const { call, onHangup, onReplaced } of this.pendingCalls) { - if (call.roomId !== this.roomId) { - continue; - } - - call.removeListener("hangup", onHangup); - call.removeListener("replaced", onReplaced); - - const userId = call.opponentMember.userId; - const existingParticipant = this.participants.find( - (p) => p.userId === userId - ); - - if (existingParticipant) { - existingParticipant.call = call; - } - - this._addCall(call); - call.answer(); - this.emit("call", call); - } - - this.pendingCalls = []; - - this._updateParticipantState(); - } - - _updateParticipantState = () => { - const userId = this.client.getUserId(); - const currentMemberState = this.room.currentState.getStateEvents( - "m.room.member", - userId - ); - - this.client.sendStateEvent( - this.roomId, - "m.room.member", - { - ...currentMemberState.getContent(), - [CONF_PARTICIPANT]: new Date().getTime(), - }, - userId - ); - - this._participantStateTimeout = setTimeout( - this._updateParticipantState, - PARTICIPANT_TIMEOUT - ); - }; - - _onMemberChanged = (_event, _state, member) => { - if (member.roomId !== this.roomId) { - return; - } - - this._processMember(member.userId); - }; - - _processMember(userId) { - const localUserId = this.client.getUserId(); - - if (userId === localUserId) { - return; - } - - // Don't process members until we've joined - if (!this.joined) { - return; - } - - const participant = this.participants.find((p) => p.userId === userId); - - if (participant) { - // Member has already been processed - return; - } - - const memberStateEvent = this.room.currentState.getStateEvents( - "m.room.member", - userId - ); - const participantTimeout = memberStateEvent.getContent()[CONF_PARTICIPANT]; - - if ( - typeof participantTimeout !== "number" || - new Date().getTime() - participantTimeout > PARTICIPANT_TIMEOUT - ) { - // Member is inactive so don't call them. - this.emit("debugstate", userId, null, "inactive"); - return; - } - - // Only initiate a call with a user who has a userId that is lexicographically - // less than your own. Otherwise, that user will call you. - if (userId < localUserId) { - this.emit("debugstate", userId, null, "waiting for invite"); - return; - } - - const call = this.client.createCall(this.roomId, userId); - this._addCall(call); - call.placeVideoCall().then(() => { - this.emit("call", call); - }); - } - - _onIncomingCall = (call) => { - if (!this.joined) { - const onHangup = (call) => { - const index = this.pendingCalls.findIndex((p) => p.call === call); - - if (index !== -1) { - this.pendingCalls.splice(index, 1); - } - }; - - const onReplaced = (call, newCall) => { - const index = this.pendingCalls.findIndex((p) => p.call === call); - - if (index !== -1) { - this.pendingCalls.splice(index, 1, { - call: newCall, - onHangup: () => onHangup(newCall), - onReplaced: (nextCall) => onReplaced(newCall, nextCall), - }); - } - }; - - this.pendingCalls.push({ - call, - onHangup: () => onHangup(call), - onReplaced: (newCall) => onReplaced(call, newCall), - }); - call.on("hangup", onHangup); - call.on("replaced", onReplaced); - - return; - } - - if (call.roomId !== this.roomId) { - return; - } - - const userId = call.opponentMember.userId; - const existingParticipant = this.participants.find( - (p) => p.userId === userId - ); - - if (existingParticipant) { - existingParticipant.call = call; - } - - this._addCall(call); - call.answer(); - this.emit("call", call); - }; - - _addCall(call) { - const userId = call.opponentMember.userId; - - this.participants.push({ - userId, - stream: null, - call, - }); - - call.on("state", (state) => - this.emit("debugstate", userId, call.callId, state) - ); - call.on("feeds_changed", () => this._onCallFeedsChanged(call)); - call.on("hangup", () => this._onCallHangup(call)); - - const onReplaced = (newCall) => { - this._onCallReplaced(call, newCall); - call.removeListener("replaced", onReplaced); - }; - - call.on("replaced", onReplaced); - this._onCallFeedsChanged(call); - - this.emit("participants_changed"); - } - - _onCallFeedsChanged = (call) => { - for (const participant of this.participants) { - if (participant.local || participant.call !== call) { - continue; - } - - const remoteFeeds = call.getRemoteFeeds(); - - if ( - remoteFeeds.length > 0 && - participant.stream !== remoteFeeds[0].stream - ) { - participant.stream = remoteFeeds[0].stream; - this.emit("participants_changed"); - } - } - }; - - _onCallHangup = (call) => { - const participantIndex = this.participants.findIndex( - (p) => !p.local && p.call === call - ); - - if (call.hangupReason === "replaced") { - return; - } - - if (participantIndex === -1) { - return; - } - - this.participants.splice(participantIndex, 1); - - this.emit("participants_changed"); - }; - - _onCallReplaced = (call, newCall) => { - const remoteParticipant = this.participants.find( - (p) => !p.local && p.call === call - ); - - remoteParticipant.call = newCall; - this.emit("call", newCall); - - newCall.on("feeds_changed", () => this._onCallFeedsChanged(newCall)); - newCall.on("hangup", () => this._onCallHangup(newCall)); - newCall.on("replaced", (nextCall) => - this._onCallReplaced(newCall, nextCall) - ); - this._onCallFeedsChanged(newCall); - - this.emit("participants_changed"); - }; - - leaveCall() { - if (!this.joined) { - return; - } - - const userId = this.client.getUserId(); - const currentMemberState = this.room.currentState.getStateEvents( - "m.room.member", - userId - ); - - this.client.sendStateEvent( - this.roomId, - "m.room.member", - { - ...currentMemberState.getContent(), - [CONF_PARTICIPANT]: null, - }, - userId - ); - - for (const participant of this.participants) { - if (!participant.local && participant.call) { - participant.call.hangup("user_hangup", false); - } - } - - this.client.stopLocalMediaStream(); - - this.joined = false; - this.participants = [this.localParticipant]; - this.localParticipant.stream = null; - this.localParticipant.call = null; - - this.emit("participants_changed"); - } - - logout() { - localStorage.removeItem("matrix-auth-store"); - } -} - -/** - * - incoming - * - you have not joined - * - you have joined - * - initial room members - * - new room members - */ - -class ConferenceCallManager2 extends EventEmitter { constructor(client) { super(); @@ -496,6 +159,7 @@ class ConferenceCallManager2 extends EventEmitter { this.client.on("RoomState.members", this._onRoomStateMembers); this.client.on("Call.incoming", this._onIncomingCall); + this.callDebugger = new ConferenceCallDebugger(this); } async enter(roomId, timeout = 30000) { @@ -505,7 +169,7 @@ class ConferenceCallManager2 extends EventEmitter { // Get the room info, wait if it hasn't been fetched yet. // Timeout after 30 seconds or the provided duration. const room = await new Promise((resolve, reject) => { - const initialRoom = manager.client.getRoom(roomId); + const initialRoom = this.client.getRoom(roomId); if (initialRoom) { resolve(initialRoom); @@ -543,7 +207,10 @@ class ConferenceCallManager2 extends EventEmitter { const stream = await this.client.getLocalVideoStream(); this.localParticipant = { + local: true, userId, + sessionId: this.sessionId, + call: null, stream, }; @@ -554,6 +221,16 @@ class ConferenceCallManager2 extends EventEmitter { // Continue doing so every PARTICIPANT_TIMEOUT ms this._updateMemberParticipantState(); + this.entered = true; + + // Answer any pending incoming calls. + const incomingCallCount = this._incomingCallQueue.length; + + for (let i = 0; i < incomingCallCount; i++) { + const call = this._incomingCallQueue.pop(); + this._onIncomingCall(call); + } + // Set up participants for the members currently in the room. // Other members will be picked up by the RoomState.members event. const initialMembers = room.getMembers(); @@ -562,9 +239,55 @@ class ConferenceCallManager2 extends EventEmitter { this._onMemberChanged(member); } - this.entered = true; + this.emit("entered"); + this.emit("participants_changed"); } + leaveCall() { + if (!this.entered) { + return; + } + + const userId = this.client.getUserId(); + const currentMemberState = this.room.currentState.getStateEvents( + "m.room.member", + userId + ); + + this.client.sendStateEvent( + this.room.roomId, + "m.room.member", + { + ...currentMemberState.getContent(), + [CONF_PARTICIPANT]: null, + }, + userId + ); + + for (const participant of this.participants) { + if (!participant.local && participant.call) { + participant.call.hangup("user_hangup", false); + } + } + + this.client.stopLocalMediaStream(); + + this.entered = false; + this.participants = [this.localParticipant]; + this.localParticipant.stream = null; + this.localParticipant.call = null; + + this.emit("participants_changed"); + } + + logout() { + localStorage.removeItem("matrix-auth-store"); + } + + /** + * Call presence + */ + _updateMemberParticipantState = () => { const userId = this.client.getUserId(); const currentMemberState = this.room.currentState.getStateEvents( @@ -585,6 +308,35 @@ class ConferenceCallManager2 extends EventEmitter { userId ); + const now = new Date().getTime(); + + for (const participant of this.participants) { + if (participant.local) { + continue; + } + + const memberStateEvent = this.room.currentState.getStateEvents( + "m.room.member", + participant.userId + ); + const participantInfo = memberStateEvent.getContent()[CONF_PARTICIPANT]; + + if ( + !participantInfo || + (participantInfo.expiresAt && participantInfo.expiresAt < now) + ) { + this.emit("debugstate", participant.userId, null, "inactive"); + + if (participant.call) { + // NOTE: This should remove the participant on the next tick + // since matrix-js-sdk awaits a promise before firing user_hangup + participant.call.hangup("user_hangup", false); + } + + return; + } + } + this._memberParticipantStateTimeout = setTimeout( this._updateMemberParticipantState, PARTICIPANT_TIMEOUT @@ -601,32 +353,70 @@ class ConferenceCallManager2 extends EventEmitter { */ _onIncomingCall = (call) => { - // The incoming calls may be for another room, which we will ignore. - if (call.roomId !== this.room.roomId) { - return; - } - // If we haven't entered yet, add the call to a queue which we'll use later. if (!this.entered) { this._incomingCallQueue.push(call); return; } - // Check if the user calling has an existing participant and use this call instead. + // The incoming calls may be for another room, which we will ignore. + if (call.roomId !== this.room.roomId) { + return; + } + + if (call.state !== "ringing") { + console.warn("Incoming call no longer in ringing state. Ignoring."); + return; + } + + // Get the remote video stream if it exists. + const stream = call.getRemoteFeeds()[0]?.stream; + const userId = call.opponentMember.userId; - const existingParticipant = manager.participants.find( + + const memberStateEvent = this.room.currentState.getStateEvents( + "m.room.member", + userId + ); + const { sessionId } = memberStateEvent.getContent()[CONF_PARTICIPANT]; + + // Check if the user calling has an existing participant and use this call instead. + const existingParticipant = this.participants.find( (p) => p.userId === userId ); + let participant; + if (existingParticipant) { + participant = existingParticipant; // This also fires the hangup event and triggers those side-effects - existingParticipant.call.hangup("user_hangup", false); + existingParticipant.call.hangup("replaced", false); existingParticipant.call = call; + existingParticipant.stream = stream; + existingParticipant.sessionId = sessionId; + } else { + participant = { + local: false, + userId, + sessionId, + call, + stream, + }; + this.participants.push(participant); } + call.on("state", (state) => + this._onCallStateChanged(participant, call, state) + ); + call.on("feeds_changed", () => this._onCallFeedsChanged(participant, call)); + call.on("replaced", (newCall) => + this._onCallReplaced(participant, call, newCall) + ); + call.on("hangup", () => this._onCallHangup(participant, call)); call.answer(); this.emit("call", call); + this.emit("participants_changed"); }; _onRoomStateMembers = (_event, _state, member) => { @@ -644,19 +434,27 @@ class ConferenceCallManager2 extends EventEmitter { return; } + // Don't process your own member. const localUserId = this.client.getUserId(); if (member.userId === localUserId) { return; } + // Get the latest member participant state event. const memberStateEvent = this.room.currentState.getStateEvents( "m.room.member", member.userId ); - const { expiresAt, sessionId } = - memberStateEvent.getContent()[CONF_PARTICIPANT]; + const participantInfo = memberStateEvent.getContent()[CONF_PARTICIPANT]; + if (!participantInfo) { + return; + } + + const { expiresAt, sessionId } = participantInfo; + + // If the participant state has expired, ignore this user. const now = new Date().getTime(); if (expiresAt < now) { @@ -664,15 +462,114 @@ class ConferenceCallManager2 extends EventEmitter { return; } - // Check the session id and expiration time of the existing participant to see if we should - // hang up the existing call and create a new one or ignore the changed member. - const participant = this.participants.find((p) => p.userId === userId); + // If there is an existing participant for this member check the session id. + // If the session id changed then we can hang up the old call and start a new one. + // Otherwise, ignore the member change event because we already have an active participant. + let participant = this.participants.find((p) => p.userId === member.userId); - if (participant && participant.sessionId !== sessionId) { - this.emit("debugstate", member.userId, null, "inactive"); - participant.call.hangup("user_hangup", false); + if (participant) { + if (participant.sessionId !== sessionId) { + this.emit("debugstate", member.userId, null, "inactive"); + participant.call.hangup("replaced", false); + } else { + return; + } } - this.emit("call", call); + // Only initiate a call with a user who has a userId that is lexicographically + // less than your own. Otherwise, that user will call you. + if (member.userId < localUserId) { + this.emit("debugstate", member.userId, null, "waiting for invite"); + return; + } + + const call = this.client.createCall(this.room.roomId, member.userId); + + if (participant) { + participant.sessionId = sessionId; + participant.call = call; + participant.stream = null; + } else { + participant = { + local: false, + userId: member.userId, + sessionId, + call, + stream: null, + }; + this.participants.push(participant); + } + + call.on("state", (state) => + this._onCallStateChanged(participant, call, state) + ); + call.on("feeds_changed", () => this._onCallFeedsChanged(participant, call)); + call.on("replaced", (newCall) => + this._onCallReplaced(participant, call, newCall) + ); + call.on("hangup", () => this._onCallHangup(participant, call)); + + call.placeVideoCall().then(() => { + this.emit("call", call); + }); + + this.emit("participants_changed"); + }; + + /** + * Call Event Handlers + */ + + _onCallStateChanged = (participant, call, state) => { + this.emit("debugstate", participant.userId, call.callId, state); + }; + + _onCallFeedsChanged = (participant, call) => { + const feeds = call.getRemoteFeeds(); + + if (feeds.length > 0 && participant.stream !== feeds[0].stream) { + participant.stream = feeds[0].stream; + this.emit("participants_changed"); + } + }; + + _onCallReplaced = (participant, call, newCall) => { + participant.call = newCall; + + newCall.on("state", (state) => + this._onCallStateChanged(participant, newCall, state) + ); + newCall.on("feeds_changed", () => + this._onCallFeedsChanged(participant, newCall) + ); + newCall.on("replaced", (nextCall) => + this._onCallReplaced(participant, newCall, nextCall) + ); + newCall.on("hangup", () => this._onCallHangup(participant, newCall)); + + const feeds = newCall.getRemoteFeeds(); + + if (feeds.length > 0) { + participant.stream = feeds[0].stream; + } + + this.emit("call", newCall); + this.emit("participants_changed"); + }; + + _onCallHangup = (participant, call) => { + if (call.hangupReason === "replaced") { + return; + } + + const participantIndex = this.participants.indexOf(participant); + + if (participantIndex === -1) { + return; + } + + this.participants.splice(participantIndex, 1); + + this.emit("participants_changed"); }; } diff --git a/src/ConferenceCallManagerHooks.js b/src/ConferenceCallManagerHooks.js index df92e550..d123652d 100644 --- a/src/ConferenceCallManagerHooks.js +++ b/src/ConferenceCallManagerHooks.js @@ -170,7 +170,6 @@ export function useVideoRoom(manager, roomId, timeout = 5000) { let initialRoom = manager.client.getRoom(roomId); if (initialRoom) { - manager.setRoom(roomId); setState((prevState) => ({ ...prevState, loading: false, @@ -186,7 +185,6 @@ export function useVideoRoom(manager, roomId, timeout = 5000) { if (room && room.roomId === roomId) { clearTimeout(timeoutId); manager.client.removeListener("Room", roomCallback); - manager.setRoom(roomId); setState((prevState) => ({ ...prevState, loading: false, @@ -226,7 +224,7 @@ export function useVideoRoom(manager, roomId, timeout = 5000) { manager.on("participants_changed", onParticipantsChanged); manager - .join() + .enter(roomId) .then(() => { setState((prevState) => ({ ...prevState, diff --git a/src/DevTools.jsx b/src/DevTools.jsx index ff975dc1..633201c6 100644 --- a/src/DevTools.jsx +++ b/src/DevTools.jsx @@ -1,3 +1,19 @@ +/* +Copyright 2021 New Vector Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + import React, { useCallback, useEffect, useRef, useState } from "react"; import ColorHash from "color-hash"; import classNames from "classnames"; @@ -56,7 +72,7 @@ export function DevTools({ manager }) { }; }, [manager]); - if (!manager.joined) { + if (!manager.entered) { return
; } diff --git a/src/DevTools.module.css b/src/DevTools.module.css index 4f6e0c1a..02dd4f47 100644 --- a/src/DevTools.module.css +++ b/src/DevTools.module.css @@ -1,3 +1,19 @@ +/* +Copyright 2021 New Vector Ltd + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + .devTools { display: flex; flex-direction: column; diff --git a/src/Room.jsx b/src/Room.jsx index 0f873315..47204ddc 100644 --- a/src/Room.jsx +++ b/src/Room.jsx @@ -104,12 +104,12 @@ export function Room({ manager }) { ); } -function Participant({ userId, stream, muted, local }) { +function Participant({ userId, stream, local }) { const videoRef = useRef(); useEffect(() => { if (stream) { - if (muted) { + if (local) { videoRef.current.muted = true; } diff --git a/src/Room.module.css b/src/Room.module.css index d54739e8..f77f7e84 100644 --- a/src/Room.module.css +++ b/src/Room.module.css @@ -14,7 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ - .room { position: relative; display: flex;