This repository has been archived by the owner on Aug 20, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathkurento-pipeline.js
147 lines (120 loc) · 4.03 KB
/
kurento-pipeline.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
'use strict';
module.exports = (kurentoClient, IceCandidate) => client => {
let pipeline;
let webRtcEndpoint;
// Used to buffer ice candidates until webRtcEndpoint is ready to process them
let iceCandidatesQueue = [];
// Start an RTSP stream using client's offer
function start(rtspUri, sdpOffer) {
return startStream(rtspUri, sdpOffer).catch(pipelineError);
}
// Start the RTSP stream
async function startStream(rtspUri, sdpOffer) {
// Create the media pipeline
const { playerEndpoint } = await createPipeline(rtspUri, sdpOffer);
// Start the pipeline
await playerEndpoint.play();
}
// Create the kurento pipeline composed of a WebRTCEndpoint and a PlayerEndpoint
// The PlayerEndpoint sends the stream into the pipeline
// The WebRtcEndpoint forwards it to the browser
async function createPipeline(rtspUri, sdpOffer) {
console.log(`Creating KMS pipeline with RTSP stream: ${rtspUri}`);
pipeline = await kurentoClient.create('MediaPipeline');
pipeline.on('Error', pipelineError);
// Create the 2 endpoints in parallel
const [playerEndpoint, webRtcEndpoint] = await Promise.all([
createPlayerEndpoint(rtspUri),
createWebRtcEndpoint(sdpOffer),
]);
// Connect the playerEndpoint to the webRtcEndpoint
await playerEndpoint.connect(webRtcEndpoint, 'VIDEO');
return {
playerEndpoint,
webRtcEndpoint,
pipeline,
};
}
// Create and start the player endpoint
async function createPlayerEndpoint(rtspUri) {
const playerOptions = {
uri: rtspUri,
useEncodedMedia: false,
// Reduce the buffering in order to decrease latency to the minimum
// Using 0 as the networkCache value could cause stability problems
networkCache: 100,
};
const playerEndpoint = await pipeline.create(
'PlayerEndpoint',
playerOptions
);
playerEndpoint.on('Error', pipelineError);
return playerEndpoint;
}
// Create and setup the WebRTC endpoint
async function createWebRtcEndpoint(sdpOffer) {
webRtcEndpoint = await pipeline.create('WebRtcEndpoint');
webRtcEndpoint.on('Error', pipelineError);
// If we already had ICE candidates queued, we add them to the WebRTC endpoint
// We can safely assume there won't be candidates added to the queue while we empty it
// since `webRtcEndpoint` has been set, so handleIceCandidate will directly send them to it
await Promise.all(
iceCandidatesQueue.map(candidate =>
webRtcEndpoint.addIceCandidate(candidate)
)
);
// Ask Kurento to process the SDP offer in order to get an SDP answer
const sdpAnswer = await webRtcEndpoint.processOffer(sdpOffer);
// Send sdp answer to client
client.send({
id: 'startResponse',
sdpAnswer,
});
// Start gathering local ICE candidates and send them to the client
webRtcEndpoint.on('OnIceCandidate', event => {
const candidate = IceCandidate(event.candidate);
client.send({
id: 'iceCandidate',
candidate,
});
});
await webRtcEndpoint.gatherCandidates();
return webRtcEndpoint;
}
function handleIceCandidate(candidate) {
const kurentoCandidate = IceCandidate(candidate);
if (webRtcEndpoint) {
console.info('Candidate received, forwarding to WebRTC endpoint');
webRtcEndpoint.addIceCandidate(kurentoCandidate);
} else {
console.info('Candidate received, queuing...');
// Push this IceCandidate into the queue
iceCandidatesQueue.push(kurentoCandidate);
}
}
// Release pipeline for this camera after a stream could not start
function pipelineError(error) {
console.error('Pipeline error:', error);
client.send({
id: 'error',
error: 'Pipeline error',
});
stop();
}
// Release pipeline
function stop() {
if (!pipeline) {
return;
}
console.info('Releasing pipeline');
pipeline.release();
pipeline = null;
webRtcEndpoint = null;
iceCandidatesQueue = [];
}
return {
start,
stop,
handleIceCandidate,
};
};