refactored UDP reassembly

This commit is contained in:
matt 2022-02-16 19:22:29 +00:00
parent 9f90c18261
commit ac8dff1c0d
1 changed files with 48 additions and 58 deletions

View File

@ -922,11 +922,12 @@ export default class RFB extends EventTargetMixin {
// WebRTC UDP datachannel
if (this._useUdp) {
this._udpArray = new Array();
this._udpBuffer= new Map();
let udpurl = this._url.split("/")[2];
udpurl = udpurl.split(":")[0];
udpurl = window.location.protocol + "//" + udpurl + ":" + 9555;
udpurl = "http://" + udpurl + ":" + 9555;
//udpurl = window.location.protocol + "//" + udpurl + ":" + 9555;
this._udpPeer = new RTCPeerConnection({
iceServers: [{
@ -957,7 +958,7 @@ export default class RFB extends EventTargetMixin {
}
let sock = this._sock;
let udpArray = this._udpArray;
let udpBuffer = this._udpBuffer;
let me = this;
this._udpChannel.onmessage = function(e) {
//Log.Info("got udp msg", e.data);
@ -977,67 +978,56 @@ export default class RFB extends EventTargetMixin {
(u8[11] << 24), 10);
if (pieces == 1) { // Handle it immediately
sock._insertIntoMiddle(u8.slice(12));
Log.Info("Single Piece recieved");
sock._insertIntoMiddle(u8.slice(12));
me._handleUdpRect();
} else { // Insert into wait array
const now = Date.now();
const item = { id: id,
i: i,
pieces: pieces,
arrival: now,
data: u8.slice(12)
};
udpArray.push(item);
if (udpBuffer.has(id)) {
let item = udpBuffer.get(id);
if (!item) {
Log.Info("Item Missing id: " + id);
return;
}
item.recieved_pieces += 1;
item.data[i] = u8.slice(12);
item.total_bytes += item.data[i].length;
} else {
let item = {
total_pieces: pieces, // number of pieces expected
arrival: now, //time first piece was recieved
recieved_pieces: 1, // current number of pieces in data
total_bytes: 0, // total size of all data pieces combined
data: new Array(pieces)
}
item.data[i] = u8.slice(12);
item.total_bytes = item.data[i].length;
udpBuffer.set(id, item);
}
}
// Process wait array
if (!udpArray.length)
return;
// Sort by id and i for easier assembly
udpArray.sort(function(a, b) {
if (a.id < b.id) return -1;
if (b.id < a.id) return 1;
if (a.i < b.id) return -1;
if (b.i < a.i) return 1;
return 0;
});
//Log.Info("waitarr len " + udpArray.length);
const now = Date.now();
for (let i = 0; i < udpArray.length; i++) {
// Drop any packets older than 100ms
if (now - udpArray[i].arrival > 100) {
udpArray.splice(i, 1);
i--;
}
}
for (const [key, value] of udpBuffer.entries()) {
// Drop any messages older than 100ms
if (now - value.arrival > 100) {
Log.Info('Removed id: ' + key);
udpBuffer.delete(key);
} else if (value.total_pieces == value.recieved_pieces) {
// Message is complete, combile data into a single array
var finaldata = new Uint8Array(value.total_bytes);
let z = 0;
for (let x = 0; x < value.data.length; x++) {
finaldata.set(value.data[x], z);
z += value.data[x].length;
}
Log.Info('Completed message applied: ' + finaldata.length + ' ' + value.total_bytes + ' ' + value.total_pieces);
sock._insertIntoMiddle(finaldata);
udpBuffer.delete(key);
me._handleUdpRect();
}
}
let curid = 0;
let cursum = 0;
for (let i = 0; i < udpArray.length; i++) {
if (curid != udpArray[i].id) {
curid = udpArray[i].id;
cursum = 0;
var curdata = new Uint8Array(udpArray[i].pieces * 1400);
}
curdata.set(udpArray[i].data, udpArray[i].i * 1400);
cursum++;
// Did we get all?
if (cursum == udpArray[i].pieces) {
sock._insertIntoMiddle(curdata.slice(0, udpArray[i].i * 1400 + udpArray[i].data.length));
me._handleUdpRect();
// Remove them
let pieces = udpArray[i].pieces;
udpArray.splice(i - udpArray[i].pieces, udpArray[i].pieces);
i -= pieces;
}
}
}
peer.createOffer().then(function(offer) {
@ -3251,7 +3241,7 @@ export default class RFB extends EventTargetMixin {
this._sock, this._display,
this._fbDepth);
} catch (err) {
this._fail("Error decoding rect: " + err);
this._fail("Error decoding rect: " + err);
return false;
}
}