èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¨ã¯ãèªã¿åãå¯è½ãªã¹ããªã¼ã ã®ãã¡ type: "bytes"
ã®åºç¤ãã¤ãã½ã¼ã¹ããããï¼ã¹ããªã¼ã å
é¨ã®ãã¥ã¼ããã¤ãã¹ãã¦ï¼åºç¤ã½ã¼ã¹ããã³ã³ã·ã¥ã¼ãã¼ã¸ã®å¹ççãªã¼ãã³ãã¼ç§»è²ã«å¯¾å¿ãã¦ãã¾ãã ããã¯ããã¼ã¿ãä»»æã®å¤§ããã®ãæ½å¨çã«ã¨ã¦ã大ããªãã£ã³ã¯ã§ä¾çµ¦ãããããªã¯ã¨ã¹ãããããããå¯è½æ§ãããããããã£ã¦ã³ãã¼ãé¿ãããã¨ãå¹çãåä¸ãããå¯è½æ§ããã使ç¨ãããç¨éãæå³ãã¦ãã¾ãã
ãã®è¨äºã§ã¯ãèªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¨é常ã®ãæ¢å®ãã¹ããªã¼ã ã¨ã®æ¯è¼ãããã³ããããã©ã®ããã«ä½æãã使ç¨ãããã«ã¤ãã¦èª¬æãã¾ãã
ã¡ã¢: èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¯ããé常ã®ãèªã¿åãå¯è½ãªã¹ããªã¼ã ã¨ã»ã¨ãã©åãã§ãããæ¦å¿µãã»ã¨ãã©ãã¹ã¦åãã§ãã ãã®è¨äºã¯ãããªãããã§ã«ãããã®æ¦å¿µãçè§£ãã¦ãããã¨ãæ³å®ãã¦ãããï¼ããããã§ããã°ï¼è¡¨é¢çã«ããåãä¸ãã¾ããã é¢é£ããæ¦å¿µã«æ £ãã¦ããªãæ¹ã¯ãå ã«èªã¿åãå¯è½ãªã¹ããªã¼ã ã®ä½¿ç¨ãã¹ããªã¼ã ã®æ¦å¿µã¨ä½¿ç¨æ³ã®æ¦è¦ãã¹ããªã¼ã API ã®æ¦è¦ ãèªãã§ãã ããã
æ¦è¦èªã¿åãå¯è½ãªã¹ããªã¼ã ã¯ããã¡ã¤ã«ãã½ã±ãããªã©ã®åºç¤ã¨ãªãã½ã¼ã¹ããããªã¼ãã¼ã夿ã¹ããªã¼ã ãæ¸ãè¾¼ã¿å¯è½ã¹ããªã¼ã ãªã©ã®ã³ã³ã·ã¥ã¼ãã¼ã«ãã¼ã¿ãã¹ããªã¼ãã³ã°ããããã®ä¸è²«ããã¤ã³ã¿ã¼ãã§ã¤ã¹ãæä¾ãã¾ãã é常ã®èªã¿åãå¯è½ãªã¹ããªã¼ã ã§ã¯ãåºç¤ã¨ãªãã½ã¼ã¹ããã®ãã¼ã¿ã¯å¸¸ã«å é¨ãã¥ã¼ãéã£ã¦ã³ã³ã·ã¥ã¼ãã¼ã«æ¸¡ããã¾ãã èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¯ãå é¨ãã¥ã¼ã空ã®å ´åãåºç¤ã¨ãªãã½ã¼ã¹ãã³ã³ã·ã¥ã¼ãã¼ã«ç´æ¥æ¸ãè¾¼ããï¼å¹ççãªã¼ãã³ãã¼ç§»è²ï¼ã¨ããç¹ã§ç°ãªãã¾ãã
èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã 㯠type: "bytes"
ã underlyingSource
ãªãã¸ã§ã¯ãã«æå®ãããã®ãã ReadableStream()
ã³ã³ã¹ãã©ã¯ã¿ã¼ã«æ¸¡ããã¨ã§ä½æããã¾ãã ãã®å¤ãè¨å®ããã¨ãã¹ããªã¼ã 㯠ReadableByteStreamController
ã§ä½æãããstart(controller)
㨠pull(controller)
ã³ã¼ã«ããã¯é¢æ°ãå¼ã³åºãéã«ããã®ãªãã¸ã§ã¯ããåºç¤å
ã®ã½ã¼ã¹ã«æ¸¡ããã¾ãã
ReadableByteStreamController
ã¨æ¢å®ã®ã³ã³ããã¼ã©ã¼ (ReadableStreamDefaultController
) ã¨ã®ä¸»ãªéãã¯ã追å ã®ãããã㣠ReadableByteStreamController.byobRequest
ï¼ReadableStreamBYOBRequest
åï¼ãæã£ã¦ãããã¨ã§ãã ããã¯ãåºç¤ã½ã¼ã¹ããã®ã¼ãã³ãã¼ç§»è²ã¨ãã¦è¡ããããã³ã³ã·ã¥ã¼ãã¼ã«ããå¾
æ©ä¸ã®èªã¿åããªã¯ã¨ã¹ãã表ãã¾ãã ä¿çä¸ã®ãªã¯ã¨ã¹ãããªãå ´åããã®ããããã£ã¯ null
ã«ãªãã¾ãã
byobRequest
ã¯ãèªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã«å¯¾ãã¦èªã¿åããªã¯ã¨ã¹ããè¡ãããã¹ããªã¼ã ã®å
é¨ãã¥ã¼ã«ãã¼ã¿ããªãå ´åã«ã®ã¿å©ç¨ã§ãã¾ãï¼ãã¼ã¿ãããå ´åã¯ããããã®ãã¥ã¼ãããªã¯ã¨ã¹ããæºãããã¾ãï¼ã
ãã¼ã¿ã転éããå¿
è¦ããããã¤ãåºç¤ã¯ byobRequest
ããããã£ã調ã¹ãå©ç¨ã§ããå ´åã¯ããã使ç¨ãã¦ãã¼ã¿ã転éããå¿
è¦ãããã¾ãã ããããã£ã null
ã®å ´åãåä¿¡ãã¼ã¿ã¯ä»£ããã« ReadableByteStreamController.enqueue()
ã使ç¨ãã¦ã¹ããªã¼ã å
é¨ã®ãã¥ã¼ã«è¿½å ããå¿
è¦ãããã¾ãï¼ããã¯ããæ¢å®ã®ã ã¹ããªã¼ã ã使ç¨ãã¦ããå ´åã«ãã¼ã¿ã転éããå¯ä¸ã®æ¹æ³ã§ãï¼ã
ReadableStreamBYOBRequest
㯠view
ããããã£ãæã£ã¦ãã¾ããããã¯ç§»è²ã®ããã«å²ãå½ã¦ããããããã¡ã¼ã«é¢ãããã¥ã¼ã§ãã åºç¤ã¨ãªãã½ã¼ã¹ããã®ãã¼ã¿ã¯ãã®ããããã£ã«æ¸ãè¾¼ã¾ããæ¬¡ã«åºç¤ã¨ãªãã½ã¼ã¹ã¯ãæ¸ãè¾¼ã¾ãããã¤ãæ°ã示ã respond()
ãå¼ã³åºããªããã°ãªãã¾ããã ããã¯ãã¼ã¿ãç§»è²ãããã¹ããã¨ãæç¤ºããã³ã³ã·ã¥ã¼ãã¼ã«ããå¾
æ©ä¸ã®èªã¿åããªã¯ã¨ã¹ãã¯è§£æ±ºããã¾ãã respond()
ãå¼ã³åºããå¾ã view
ã«ã¯æ¸ãè¾¼ã¿ãã§ããªããªãã¾ãã
追å ã®ã¡ã½ãã ReadableStreamBYOBRequest.respondWithNewView()
ãããã¾ãããã®ã¡ã½ããã«ãåºç¤ã¨ãªãã½ã¼ã¹ã¯ãç§»è²ãããã¼ã¿ãæ ¼ç´ãããæ°ããããã¥ã¼ã渡ããã¨ãã§ãã¾ãã ãã®æ°ãããã¥ã¼ã¯ãå
ã¨åãã¡ã¢ãªã¼ãããã¡ã¼ä¸ã§ãåãéå§ãªãã»ããããã§ãªããã°ãªãã¾ããã ãã®ã¡ã½ããã¯ãä¾ãã°ããã¤ãåºç¤ãæåã«ã¯ã¼ã«ã¼ã¹ã¬ããã«ãã¥ã¼ãç§»è²ãããããåå¾ãã¦ãã byobRequest
ã«å¿çããå¿
è¦ãããå ´åã«ä½¿ç¨ãã¾ãã ã»ã¨ãã©ã®å ´åããã®ã¡ã½ããã¯å¿
è¦ããã¾ããã
èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¯é常ã ReadableStreamBYOBReader
ã使ç¨ãã¦èªã¿åããã¾ããããã¯ãã¹ããªã¼ã ä¸ã§ ReadableStream.getReader()
ãå¼ã³åºãã options 弿°ã« mode: "byob"
ãæå®ãã¾ãã
èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã¯ãæ¢å®ã®ãªã¼ãã¼ (ReadableStreamDefaultReader
) ã使ç¨ãã¦èªã¿åããã¨ãã§ãã¾ããããã®å ´å byobRequest
ãªãã¸ã§ã¯ãã¯ãã¹ããªã¼ã ã«å¯¾ãã¦èªåãããã¡ã¼å²ãå½ã¦ãæå¹ã«ãªã£ã¦ããï¼autoAllocateChunkSize
ãã¹ããªã¼ã ã® underlyingSource
ã«è¨å®ããã¦ããï¼å ´åã«ã®ã¿ä½æããã¾ãã ãã®ç¨éã§ã¯ autoAllocateChunkSize
ã§ç¤ºããããµã¤ãºããããã¡ã¼ãµã¤ãºã¨ãã¦ä½¿ç¨ããããã¨ã«æ³¨æãã¦ãã ããã ãã®ããããã£ãæå®ããªãã£ãå ´åãæ¢å®å¤ã§ã¯ãªã¼ãã¼ã¯ãåä½ããã¾ãããåºç¤ã«ãªãã½ã¼ã¹ã« byobRequest
ãæä¾ããããã¨ã¯ãªãããã¹ã¦ã®ãã¼ã¿ã¯ã¹ããªã¼ã ã®å
é¨ãã¥ã¼ãéãã¦è»¢éããã¾ãã
ä¸è¨ã®ç°ãªãç¹ã®ä»ã«ãããã¤ãã¹ããªã¼ã ã®ã³ã³ããã¼ã©ã¼ã¨åºç¤ã¯ãæ¢å®ã®ã¹ããªã¼ã ã®ã³ã³ããã¼ã©ã¼ã¨ã¨ã¦ãããä¼¼ã¦ãããã»ã¼åãæ¹æ³ã§ä½¿ç¨ãã¾ãã
ä¾ ãã¤ããªã¼ãã¼ã®åºç¤ã®ããã·ã¥ã½ã¼ã¹ãã®ã©ã¤ãä¾ã§ã¯ãããã·ã¥åºç¤ãã¤ãã½ã¼ã¹ã§èªã¿åãå¯è½ãªãã¤ãã¹ããªã¼ã ã使ãããã¤ããªã¼ãã¼ã使ç¨ãã¦èªã¿åãæ¹æ³ã示ãã¾ãã
ãã«åãã¤ãã½ã¼ã¹ã®å ´åã¨ã¯ç°ãªãããã¼ã¿ã¯ä»»æã®æç¹ã§å°çãã¾ãã ãã®ãããåºç¤ã¨ãªãã½ã¼ã¹ã¯ controller.byobRequest
ã使ç¨ãã¦ãåä¿¡ãã¼ã¿ãåå¨ããå ´åã¯ãããç§»è²ããããã§ãªãå ´åã¯ã¹ããªã¼ã å
é¨ã®ãã¥ã¼ã«ãã¼ã¿ãå
¥ããå¿
è¦ãããã¾ãã ããã«ããã¼ã¿ã¯ãã¤ã§ãå°çããå¯è½æ§ããããããç£è¦åä½ã¯ underlyingSource.start()
ã³ã¼ã«ããã¯é¢æ°ã§è¨å®ãã¾ãã
ãã®ä¾ã¯ãã¹ããªã¼ã 仿§ã®ããã·ã¥ãã¤ãã½ã¼ã¹ã®ä¾ãã大ããªå½±é¿ãåãã¦ãã¾ãã ãã®ä¾ã§ã¯ãä»»æã®ãµã¤ãºã®ãã¼ã¿ãä¾çµ¦ãããä»®æ³ã½ã±ãããã½ã¼ã¹ã使ç¨ãã¦ãã¾ãã å ã®åºç¤ãã¹ããªã¼ã ã«ãã¼ã¿ãéä¿¡ããããã«ç§»è²ã¨å¾ ã¡è¡åã®ä¸¡æ¹ã使ç¨ãããã¨ãã§ããããã«ããªã¼ãã¼ã¯æ§ã ãªç¹ã§æå³çã«é å»¶ãã¦ãã¾ãã èå§å¯¾å¿ã¯å®æ¼ããã¦ãã¾ããã
ã¡ã¢: æ¢å®ã®ãªã¼ãã¼ã§ã¯ãåºç¤ãã¤ãã½ã¼ã¹ã使ç¨ãããã¨ãã§ãã¾ãã èªåãããã¡ã¼å²ãå½ã¦ãæå¹ã«ãªã£ã¦ããå ´åãã³ã³ããã¼ã©ã¼ã¯ããªã¼ãã¼ããã®æªå¦çã®ãªã¯ã¨ã¹ãããããã¹ããªã¼ã ã®å é¨ãã¥ã¼ã空ã§ããå ´åã«ãã¼ãã³ãã¼ç§»è²ç¨ã«ä¿®æ£ããããµã¤ãºã®ãããã¡ã¼ãä¾çµ¦ãã¾ãã èªåãããã¡ã¼å²ãå½ã¦ãæå¹ã«ãªã£ã¦ããªãå ´åããã¤ãã¹ããªã¼ã ã®ãã¼ã¿ã¯ãã¹ã¦å¸¸ã«å¾ ã¡è¡åã«å ¥ãããã¾ãã ããã¯ã"pull: underlying byte source "ã®ä¾ã§è¡¨ç¤ºãããåä½ã«ä¼¼ã¦ãã¾ãã
模æ¬åºç¤ã½ã±ããã½ã¼ã¹æ¨¡æ¬åºç¤ã«ã¯ 3 ã¤ã®éè¦ãªã¡ã½ãããããã¾ãã
select2()
ã¯æªå¦çã®ãªã¯ã¨ã¹ãã表ãã¾ãã ãã¼ã¿ãå©ç¨ã§ããã¨è§£æ±ºãããããã¹ãè¿ãã¾ããreadInto()
ã¯ã½ã±ããããä¸ãããããããã¡ã¼ã«ãã¼ã¿ãèªã¿è¾¼ã¿ããã¼ã¿ãã¯ãªã¢ãã¾ããclose()
ã¯ã½ã±ãããéãã¾ããå®è£
ããããã«ã¨ã¦ãåç´åãã¦ãã¾ãã ä¸è¨ã§ç¤ºãããã«ã select2()
ã¯ã¿ã¤ã ã¢ã¦ãæã«ã©ã³ãã ãªãµã¤ãºã®ã©ã³ãã ãªãã¼ã¿ã®ãããã¡ã¼ ã使ãã¾ãã 使ãããã¼ã¿ã¯ãããã¡ã¼ã«èªã¿è¾¼ã¾ããreadInto()
ã§ã¯ãªã¢ããã¾ãã
class MockHypotheticalSocket {
constructor() {
this.max_data = 800; // ã½ã±ããããã¹ããªã¼ãã³ã°ãããã¼ã¿ã®ç·é
this.max_per_read = 100; // ä¸åº¦ã«èªã¿åããã¼ã¿éã®æå¤§å¤
this.min_per_read = 40; // ä¸åº¦ã«èªã¿åããã¼ã¿éã®æå¤§å¤
this.data_read = 0; // total data read so far (capped is maxdata)
this.socketdata = null;
}
// ãã®ã½ã±ãããèªã¿åãå¯è½ãªå ´åã«ãããã¹ãè¿ãã¡ã½ããã
select2() {
// ãããã¹ã解決ããããã«ä½¿ç¨ãããªãã¸ã§ã¯ã
const resultobj = {};
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (this.data_read >= this.max_data) {
//out of data
resolve(resultobj);
return;
}
// ãã¼ã¿ã®é
ãèªã¿è¾¼ã¿ãã¨ãã¥ã¬ã¼ã
setTimeout(() => {
const numberBytesReceived = this.getNumberRandomBytesSocket();
this.data_read += numberBytesReceived;
this.socketdata = this.randomByteArray(numberBytesReceived);
resultobj["bytesRead"] = numberBytesReceived;
resolve(resultobj);
}, 500);
});
}
/* æå®ãããããã¡ã¼ãªãã»ããã«ãã¼ã¿ãèªã¿è¾¼ã */
readInto(buffer, offset, length) {
let length_data = 0;
if (this.socketdata) {
length_data = this.socketdata.length;
const myview = new Uint8Array(buffer, offset, length);
// æå®ããé·ãã®ãã¼ã¿ããããã¡ã¼ã«æ¸ãè¾¼ã
// ã³ã¼ãã¯å¸¸ã«ãããã¡ã¼ãåä¿¡ãã¼ã¿ãã大ãããã¨ãæ³å®
for (let i = 0; i < length_data; i++) {
myview[i] = this.socketdata[i];
}
this.socketdata = null; // Clear "socket" data after reading
}
return length_data;
}
// ããã¼ã® close 颿°
close() {
return;
}
// Return random number bytes in this call of socket
getNumberRandomBytesSocket() {
// Capped to remaining data and the max min return-per-read range
const remaining_data = this.max_data - this.data_read;
const numberBytesReceived =
remaining_data < this.min_per_read
? remaining_data
: this.getRandomIntInclusive(
this.min_per_read,
Math.min(this.max_per_read, remaining_data),
);
return numberBytesReceived;
}
// Return random number between two values
getRandomIntInclusive(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1) + min);
}
// ã©ã³ãã ãªæåã®æååãè¿ã
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
/* ã©ã³ãã 㪠Uint8Array ã®ãã¤ãåãè¿ã */
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
.input {
float: left;
width: 50%;
}
.output {
float: right;
width: 50%;
overflow-wrap: break-word;
}
button {
display: block;
}
<button>Cancel stream</button>
<div class="input">
<h2>Underlying source</h2>
<ul></ul>
</div>
<div class="output">
<h2>Consumer</h2>
<ul></ul>
</div>
// ãªã¹ããæ®µè½ããã¿ã³ã¸ã®åç
§ãæ ¼ç´
const list1 = document.querySelector(".input ul");
const list2 = document.querySelector(".output ul");
const button = document.querySelector("button");
// æçµçµæãæ ¼ç´ããããã«ç©ºæååã使
let result = "";
// åºç¤ãããã¼ã¿ããã°åºåãã颿°
function logSource(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list1.appendChild(listItem);
}
// ã³ã³ã·ã¥ã¼ãã¼ã®ãã¼ã¿ããã°åºåãã颿°
function logConsumer(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list2.appendChild(listItem);
}
èªã¿åãå¯è½ãªã½ã±ããããã·ã¥ãã¤ãã¹ããªã¼ã ã®ä½æ
以ä¸ã®ã³ã¼ãã¯ãèªã¿åãå¯è½ãªã½ã±ãããããã·ã¥ããã¤ãã¹ããªã¼ã ãå®ç¾©ããæ¹æ³ã示ãã¦ãã¾ãã
underlyingSource
ãªãã¸ã§ã¯ãã®å®ç¾©ã¯ ReadableStream()
ã³ã³ã¹ãã©ã¯ã¿ã¼ã®æåã®å¼æ°ã¨ãã¦æ¸¡ããã¾ãã ãããèªã¿åãå¯è½ãªããã¤ããã¹ããªã¼ã ã«ããããã«ã type: "bytes"
ããªãã¸ã§ã¯ãã®ããããã£ã¨ãã¦æå®ãã¾ãã ããã«ãããã¹ããªã¼ã ã¯ç¢ºå®ã« ReadableByteStreamController
ã«ï¼æ¢å®ã®ã³ã³ããã¼ã©ã¼ (ReadableStreamDefaultController
ã®ä»£ããã«ï¼æ¸¡ãããããã«ãªãã¾ãã
ãã¼ã¿ã¯ã³ã³ã·ã¥ã¼ãã¼ãå¦çããæºåãã§ããåã«ã½ã±ããã«å°çããå¯è½æ§ããããããåºç¤ã®èªã¿åãã«é¢ãããã¹ã¦ã®è¨å®ã¯ start()
ã³ã¼ã«ããã¯ã¡ã½ããã§è¡ãã¾ãï¼ãã¼ã¿ã®å¦çãå§ããã«ã¯ãã«ãå¾
ã¡ã¾ããï¼ã å®è£
㯠"socket" ãéãããã¼ã¿ããªã¯ã¨ã¹ãããããã« select2()
ãå¼ã³åºãã¾ãã è¿ããããããã¹ã解決ãããã¨ãã³ã¼ã㯠controller.byobRequest
ãåå¨ããã (null
ã§ãªãã) ã調ã¹ãåå¨ããå ´å㯠socket.readInto()
ãå¼ã³åºãã¦ãã¼ã¿ããªã¯ã¨ã¹ãã«ã³ãã¼ãã¦ç§»è²ãã¾ãã ãã byobRequest
ãåå¨ããªããã°ãã¼ãã³ãã¼ã§è»¢éã§ããæ¶è²»ã¹ããªã¼ã ããã®æªå¦çã®ãªã¯ã¨ã¹ãã¯ããã¾ããã ãã®å ´åã controller.enqueue()
ã使ç¨ãã¦ã¹ããªã¼ã å
é¨ã®ãã¥ã¼ã«ãã¼ã¿ãã³ãã¼ãã¾ãã
ããã«ãã¼ã¿ãè¦æ±ãã select2()
ãªã¯ã¨ã¹ãã¯ããã¼ã¿ããªããªã¯ã¨ã¹ããè¿ãã¾ã§åæç¨¿ããã¾ãã ãã®ç¹ã§ãã³ã³ããã¼ã©ã¼ã¯ã¹ããªã¼ã ãéããããã«ä½¿ç¨ããã¾ãã
const stream = makeSocketStream("dummy host", "dummy port");
const DEFAULT_CHUNK_SIZE = 400;
function makeSocketStream(host, port) {
const socket = new MockHypotheticalSocket();
return new ReadableStream({
type: "bytes",
start(controller) {
readRepeatedly().catch((e) => controller.error(e));
function readRepeatedly() {
return socket.select2().then(() => {
// Since the socket can become readable even when there's
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if (controller.byobRequest) {
const v = controller.byobRequest.view;
bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
if (bytesRead === 0) {
controller.close();
}
controller.byobRequest.respond(bytesRead);
logSource(`byobRequest with ${bytesRead} bytes`);
} else {
const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
if (bytesRead === 0) {
controller.close();
} else {
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
logSource(`enqueue() ${bytesRead} bytes (no byobRequest)`);
}
if (bytesRead === 0) {
return;
// no more bytes in source
}
return readRepeatedly();
});
}
},
cancel() {
socket.close();
logSource(`cancel(): socket closed`);
},
});
}
readRepeatedly()
ã¯ãããã¹ãè¿ããã¨ã«æ³¨æãã¦ãã ãããããã使ç¨ãã¦ãèªã¿è¾¼ã¿å¦çã®è¨å®ãå¦çã§çºçããã¨ã©ã¼ããã£ãããã¾ãã ã¨ã©ã¼ã¯ä¸ã§ç¤ºããããã«ã³ã³ããã¼ã©ã«æ¸¡ããã¾ãï¼readRepeatedly().catch((e) => controller.error(e));
ãåç
§ï¼ã
æå¾ã« cancel()
ã¡ã½ãããæå®ãããåºç¤ã¨ãªã£ã¦ããã½ã¼ã¹ãéãã¾ããpull()
ã³ã¼ã«ããã¯ã¯ä¸è¦ãªã®ã§å®è£
ãã¾ããã
以ä¸ã®ã³ã¼ãã¯ã½ã±ããã®ãã¤ãã¹ããªã¼ã ç¨ã« ReadableStreamBYOBReader
ã使ããããã使ç¨ãã¦ãã¼ã¿ããããã¡ã¼ã«èªã¿è¾¼ã¿ã¾ãã ãªãã processText()
ãå帰çã«å¼ã³åºããããããã¡ã¼ã䏿¯ã«ãªãã¾ã§ãã¼ã¿ãèªã¿è¾¼ã¾ãããã¨ã«æ³¨æãã¦ãã ããã åºç¤ããããã¼ã¿ããªãã¨æç¤ºããã¨ãreader.read()
ã«ã¯ done
ãè¨å®ãããèªã¿è¾¼ã¿å¦çãå®äºãã¾ãã
ãã®ã³ã¼ãã¯ãä¸è¨ã®ãã¤ããªã¼ãã¼ä»ãã®åºç¤ãã«ã½ã¼ã¹ã®ä¾ã¨ã»ã¨ãã©åãã§ãã å¯ä¸ã®éãã¯ããªã¼ãã¼ãèªã¿åããé ãããã³ã¼ããå«ãã§ãããã¨ã§ãã
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(4000);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
while (offset < buffer.byteLength) {
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(async function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
//logConsumer(`Read ${bytesReceived} bytes: ${value}`);
logConsumer(`Read ${bytesReceived} bytes`);
result += value;
// Add delay to emulate when data can't be read and data is enqueued
if (bytesReceived > 300 && bytesReceived < 600) {
logConsumer(`Delaying read to emulate slow stream reading`);
const delay = (ms) =>
new Promise((resolve) => setTimeout(resolve, ms));
await delay(1000);
}
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
}
ãªã¼ãã¼ã¨ä½¿ç¨ããã¹ããªã¼ã ã®åãæ¶ã
ReadableStreamBYOBReader.cancel()
ã使ç¨ãã¦ã¹ããªã¼ã ãåãæ¶ããã¨ãã§ãã¾ãã ãã®ä¾ã§ã¯ã"user choice" ã®çç±ã§ãã¿ã³ãã¯ãªãã¯ãããå ´åã«ã¡ã½ãããå¼ã³åºãã¦ãã¾ãï¼ãã¿ã³ã®ä»ã® HTML ãã³ã¼ãã¯è¡¨ç¤ºãã¦ãã¾ããï¼ã åãæ¶ãããå¯è½æ§ãããå¦çãå®äºããã¨ãããã°åºåãã¦ãã¾ãã
button.addEventListener("click", () => {
reader
.cancel("user choice")
.then(() => logConsumer("reader.cancel complete"));
});
ReadableStreamBYOBReader.releaseLock()
ã使ç¨ããã¨ãã¹ããªã¼ã ãåãæ¶ããã¨ãªããªã¼ãã¼ãè§£æ¾ãããã¨ãã§ãã¾ãã ãã ããæªå¦çã®èªã¿è¾¼ã¿ãªã¯ã¨ã¹ãã¯å³åº§ã«æå¦ããããã¨ã«æ³¨æãã¦ãã ããã æ®ãã®ãã£ã³ã¯ãèªãããã«ãå¾ã§æ°ãããªã¼ãã¼ãåå¾ãããã¨ãã§ãã¾ãã
ReadableStreamBYOBReader.closed
ããããã£ã¯ãã¹ããªã¼ã ãéããããã¨ãã«è§£æ±ºããã¨ã©ã¼ãããå ´åã¯æå¦ããããããã¹ãè¿ãã¾ãã ãã®ã±ã¼ã¹ã§ã¯ã¨ã©ã¼ã¯æå¾
ããã¾ããããç¶ãã³ã¼ãã§ã¯å®äºã±ã¼ã¹ããã°åºåããå¿
è¦ãããã¾ãã
reader.closed
.then(() => {
logConsumer("ReadableStreamBYOBReader.closed: resolved");
})
.catch(() => {
logConsumer("ReadableStreamBYOBReader.closed: rejected:");
});
çµæ
åºç¤ã¨ãªãããã·ã¥ã½ã¼ã¹ï¼å·¦ï¼ã¨ã³ã³ã·ã¥ã¼ãã¼ï¼å³ï¼ããã®ãã°åºåãä¸è¨ã«ç¤ºãã¾ãã ä¸å¤®ã®æéã¯ããã¼ã¿ãã¼ãã³ãã¼å¦çã¨ãã¦ç§»è²ãããã®ã§ã¯ãªããã¨ã³ãã¥ã¼ãããæéã§ãã
ãã¤ããªã¼ãã¼ä»ãã®åºç¤ãã«ã½ã¼ã¹ãã®ã©ã¤ã表示ä¾ã¯ããã¡ã¤ã«ãªã©ã®ããã«ããã¤ãåºç¤ãããã¼ã¿ãèªã¿åããã¹ããªã¼ã ã«ãã£ã¦ ReadableStreamBYOBReader
ã«ã¼ãã³ãã¼ç§»è²ããæ¹æ³ã示ãã¦ãã¾ãã
åºç¤ã¨ãªããã«ã½ã¼ã¹ã«ã¯ã以ä¸ã®ã¯ã©ã¹ã使ç¨ãã¦ãnodejs ã® FileHandle
ãç¹ã« read()
ã¡ã½ãããï¼ã¨ã¦ã表é¢çã«ï¼æ¨¡å£ãã¾ãã ãã®ã¯ã©ã¹ã¯ããã¡ã¤ã«ã表ãã©ã³ãã ãªãã¼ã¿ãçæãã¾ãã read()
ã¡ã½ããã¯ã©ã³ãã ãªãã¼ã¿ã®ãæ¬ä¼¼ä¹±æ°ãã®å¤§ããã®ãããã¯ããæå®ãããä½ç½®ããæä¾ããããããã¡ã¼ã«èªã¿è¾¼ã¿ã¾ãã close()
ã¡ã½ããã¯ä½ãããããã§ã¯ããã¾ãããã¹ããªã¼ã ã®ã³ã³ã¹ãã©ã¯ã¿ã¼ãå®ç¾©ããéã«ãã½ã¼ã¹ãéããå ´æã示ãããã«æå®ãããã ãã§ãã
ã¡ã¢: é¡ä¼¼ãã¦ããã¯ã©ã¹ã¯ããã¹ã¦ã®ããã«ã½ã¼ã¹ãã®ä¾ã«ä½¿ç¨ãã¦ãã¾ãã ããã§è¡¨ç¤ºããã¦ããã®ã¯ãããã¾ã§æ å ±ã§ãï¼æ¨¡æ¬ã§ãããã¨ããããããã«ï¼ã
class MockUnderlyingFileHandle {
constructor() {
this.maxdata = 100; // "file size"
this.maxReadChunk = 25; // "max read chunk size"
this.minReadChunk = 13; // "min read chunk size"
this.filedata = this.randomByteArray(this.maxdata);
this.position = 0;
}
// Read data from "file" at position/length into specified buffer offset
read(buffer, offset, length, position) {
// ãããã¹ã解決ããããã«ä½¿ç¨ãããªãã¸ã§ã¯ã
const resultobj = {};
resultobj["buffer"] = buffer;
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (position >= this.maxdata) {
//out of data
resolve(resultobj);
return;
}
// ä¹±æ°ã®ãã¤ãåãè¿ããã¡ã¤ã«èªã¿è¾¼ã¿ã®ã·ãã¥ã¬ã¼ã·ã§ã³
// ãªã¯ã¨ã¹ããããæå°ãã¤ãæ°ã¨è¿ããã¨ãã§ããã©ã³ãã ãã¤ãæ°ãèªã¿åã
let readLength =
Math.floor(
Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
) + this.minReadChunk;
readLength = length > readLength ? readLength : length;
// æä¾ããããããã¡ã¼ã«ã©ã³ãã ãã¼ã¿ãèªã¿è¾¼ã
const myview = new Uint8Array(buffer, offset, readLength);
// æå®ããé·ãã®ãã¼ã¿ãæ¸ã
for (let i = 0; i < readLength; i++) {
myview[i] = this.filedata[position + i];
resultobj["bytesRead"] = i + 1;
if (position + i + 1 >= this.maxdata) {
break;
}
}
// ãã¼ã¿ã®é
ãèªã¿è¾¼ã¿ãã¨ãã¥ã¬ã¼ã
setTimeout(() => {
resolve(resultobj);
}, 1000);
});
}
// ããã¼ã® close 颿°
close() {
return;
}
// ã©ã³ãã ãªæåã®æååãè¿ã
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
// ã©ã³ãã 㪠Uint8Array ã®ãã¤ãåãè¿ã
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
.input {
float: left;
width: 50%;
}
.output {
float: right;
width: 50%;
overflow-wrap: break-word;
}
button {
display: block;
}
<button>Cancel stream</button>
<div class="input">
<h2>Underlying source</h2>
<ul></ul>
</div>
<div class="output">
<h2>Consumer</h2>
<ul></ul>
</div>
// ãªã¹ããæ®µè½ããã¿ã³ã¸ã®åç
§ãæ ¼ç´
const list1 = document.querySelector(".input ul");
const list2 = document.querySelector(".output ul");
const button = document.querySelector("button");
// æçµçµæãæ ¼ç´ããããã«ç©ºæååã使
let result = "";
// åºç¤ãããã¼ã¿ããã°åºåãã颿°
function logSource(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list1.appendChild(listItem);
}
// ã³ã³ã·ã¥ã¼ãã¼ã®ãã¼ã¿ããã°åºåãã颿°
function logConsumer(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list2.appendChild(listItem);
}
èªã¿åãå¯è½ãªãã¡ã¤ã«ãã¤ãã¹ããªã¼ã ã®ä½æ
以ä¸ã®ã³ã¼ãã¯ãèªã¿åãå¯è½ãªãã¡ã¤ã«ã®ãã¤ãã¹ããªã¼ã ãå®ç¾©ããæ¹æ³ã示ãã¦ãã¾ãã
åã®ä¾ã¨åæ§ã«ã underlyingSource
ãªãã¸ã§ã¯ãå®ç¾©ã¯ ReadableStream()
ã³ã³ã¹ãã©ã¯ã¿ã¼ã®æåã®å¼æ°ã¨ãã¦æ¸¡ããã¾ãã ãããèªã¿åãå¯è½ãªããã¤ããã¹ããªã¼ã ã«ããããã«ããªãã¸ã§ã¯ãã®ããããã£ã« type: "bytes"
ããªãã¸ã§ã¯ãã®ããããã£ã¨ãã¦æå®ãã¾ãã ããã«ãããã¹ããªã¼ã ã« ReadableByteStreamController
ãæ¸¡ããããã¨ã確å®ã«ãã¾ãã
start()
颿°ã¯åã«ãã¡ã¤ã«ãã³ãã«ãéãããã㯠cancel()
ã³ã¼ã«ããã¯ã§éãããã¾ãã cancel()
㯠ReadableStream.cancel()
ã¾ã㯠ReadableStreamDefaultController.close()
ãå¼ã³åºãããå ´åã«ãªã½ã¼ã¹ãã¯ãªã¼ã³ã¢ããããããã«æå®ããã¦ãã¾ãã
æãè峿·±ãã³ã¼ã㯠pull()
ã³ã¼ã«ããã¯ã§ãã ããã¯ãã¡ã¤ã«ããå¾
æ©ä¸ã®èªã¿è¾¼ã¿ãªã¯ã¨ã¹ãã«ãã¼ã¿ãã³ãã¼ãï¼ReadableByteStreamController.byobRequest
ï¼ã respond()
ãå¼ã³åºãã¦ãããã¡ã¼å
ã®ãã¼ã¿éã示ããããã転éãã¾ãã ãã¡ã¤ã«ãã 0 ãã¤ãã転éãããå ´åããã¹ã¦ã³ãã¼ããããã¨ãããããã³ã³ããã¼ã©ã¼ã® close()
ãå¼ã³åºãã¾ãã
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
ãã¤ãã¹ããªã¼ã ã®ä½¿ç¨
以ä¸ã®ã³ã¼ãã¯ãã¡ã¤ã«ã®ãã¤ãã¹ããªã¼ã ç¨ã« ReadableStreamBYOBReader
ã使ããããã使ç¨ãã¦ãã¼ã¿ããããã¡ã¼ã«èªã¿è¾¼ãã§ãã¾ãã ãªãã processText()
ãå帰çã«å¼ã³åºããããããã¡ã¼ã䏿¯ã«ãªãã¾ã§ãã¼ã¿ãèªã¿è¾¼ã¿ã¾ãã åºç¤ã¨ãªãã½ã¼ã¹ããã以ä¸ãã¼ã¿ããªããã¨ãæç¤ºããã¨ã reader.read()
ã® done
ã true ã«è¨å®ãããèªã¿è¾¼ã¿å¦çãå®äºãã¾ãã
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(200);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
logConsumer(
`Read ${value.byteLength} (${bytesReceived}) bytes: ${value}`,
);
result += value;
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
æå¾ã«ããã¿ã³ãã¯ãªãã¯ãããå ´åã«ã¹ããªã¼ã ãåãæ¶ãããå¯è½æ§ã®ãããã³ãã©ã¼ã追å ãã¾ãï¼HTML ã¨ãã¿ã³ã®ããã®ã³ã¼ãã¯ç¤ºãã¾ããï¼ã
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
çµæ
åºç¤ã¨ãªããã«ã½ã¼ã¹ï¼å·¦ï¼ã¨ã³ã³ã·ã¥ã¼ãã¼ï¼å³ï¼ããã®ãã°åºåãä¸è¨ã«ç¤ºãã¾ãã ç¹çãã¹ããã¨ã¯æ¬¡ã®éãã§ãã
start()
颿°ã«ã¯ ReadableByteStreamController
ãæ¸¡ããã¾ãããã®ã©ã¤ãä¾ã§ã¯ãæ¢å®ã®ãªã¼ãã¼ (ReadableStreamDefaultReader
) ã使ç¨ãã¦ãåããã¼ã¿ãã¼ãã³ãã¼ç§»è²ã§èªã¿åãæ¹æ³ã示ãã¾ãã ãã®ä¾ã§ã¯ãå
ã®ä¾ã¨åãæ¨¡æ¬åºç¤ãã¡ã¤ã«ã½ã¼ã¹ã使ç¨ãã¦ãã¾ãã
class MockUnderlyingFileHandle {
constructor() {
this.maxdata = 100; // "file size"
this.maxReadChunk = 25; // "max read chunk size"
this.minReadChunk = 13; // "min read chunk size"
this.filedata = this.randomByteArray(this.maxdata);
this.position = 0;
}
// Read data from "file" at position/length into specified buffer offset
read(buffer, offset, length, position) {
// ãããã¹ã解決ããããã«ä½¿ç¨ãããªãã¸ã§ã¯ã
const resultobj = {};
resultobj["buffer"] = buffer;
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (position >= this.maxdata) {
//out of data
resolve(resultobj);
return;
}
// ä¹±æ°ã®ãã¤ãåãè¿ããã¡ã¤ã«èªã¿è¾¼ã¿ã®ã·ãã¥ã¬ã¼ã·ã§ã³
// ãªã¯ã¨ã¹ããããæå°ãã¤ãæ°ã¨è¿ããã¨ãã§ããã©ã³ãã ãã¤ãæ°ãèªã¿åã
let readLength =
Math.floor(
Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
) + this.minReadChunk;
readLength = length > readLength ? readLength : length;
// æä¾ããããããã¡ã¼ã«ã©ã³ãã ãã¼ã¿ãèªã¿è¾¼ã
const myview = new Uint8Array(buffer, offset, readLength);
// æå®ããé·ãã®ãã¼ã¿ãæ¸ã
for (let i = 0; i < readLength; i++) {
myview[i] = this.filedata[position + i];
resultobj["bytesRead"] = i + 1;
if (position + i + 1 >= this.maxdata) {
break;
}
}
// ãã¼ã¿ã®é
ãèªã¿è¾¼ã¿ãã¨ãã¥ã¬ã¼ã
setTimeout(() => {
resolve(resultobj);
}, 1000);
});
}
// ããã¼ã® close 颿°
close() {
return;
}
// ã©ã³ãã ãªæåã®æååãè¿ã
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
// ã©ã³ãã 㪠Uint8Array ã®ãã¤ãåãè¿ã
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
.input {
float: left;
width: 50%;
}
.output {
float: right;
width: 50%;
overflow-wrap: break-word;
}
button {
display: block;
}
<button>Cancel stream</button>
<div class="input">
<h2>Underlying source</h2>
<ul></ul>
</div>
<div class="output">
<h2>Consumer</h2>
<ul></ul>
</div>
// ãªã¹ããæ®µè½ããã¿ã³ã¸ã®åç
§ãæ ¼ç´
const list1 = document.querySelector(".input ul");
const list2 = document.querySelector(".output ul");
const button = document.querySelector("button");
// æçµçµæãæ ¼ç´ããããã«ç©ºæååã使
let result = "";
// åºç¤ãããã¼ã¿ããã°åºåãã颿°
function logSource(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list1.appendChild(listItem);
}
// ã³ã³ã·ã¥ã¼ãã¼ã®ãã¼ã¿ããã°åºåãã颿°
function logConsumer(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list2.appendChild(listItem);
}
èªåãããã¡ã¼å²ãå½ã¦ã«ããèªã¿åãå¯è½ãªãã¡ã¤ã«ãã¤ãã¹ããªã¼ã ã®ä½æ
å¯ä¸ã®åºç¤ã®éãã¯ãautoAllocateChunkSize
ãæå®ããªããã°ãªããªããã¨ã¨ãã³ã³ã·ã¥ã¼ãã¼ããæä¾ããããµã¤ãºã§ã¯ãªããcontroller.byobRequest
ã®ãã¥ã¼ãããã¡ã¼ãµã¤ãºã¨ãã¦ä½¿ç¨ãããã¨ã§ãã
const DEFAULT_CHUNK_SIZE = 20;
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, // Only relevant if using a default reader
});
}
æ¢å®ã®ãªã¼ãã¼ã§ã®ãã¤ãã¹ããªã¼ã ã®å©ç¨
以ä¸ã®ã³ã¼ãã§ã¯ãã¢ã¼ããæå®ããã« stream.getReader();
ãå¼ã³åºãã¦ãã¡ã¤ã«ãã¤ãã¹ããªã¼ã ç¨ã® ReadableStreamDefaultReader
ã使ããããã使ç¨ãã¦ãã¼ã¿ããããã¡ã¼ã«èªã¿è¾¼ãã§ãã¾ãã ã³ã¼ãã®å¦çã¯ããããã¡ã¼ãã³ã³ã·ã¥ã¼ãã§ã¯ãªãã¹ããªã¼ã ããä¾çµ¦ããããã¨ãé¤ãã¦ãååã®ä¾ã¨åãã§ãã
const reader = stream.getReader();
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
bytesReceived += value.length;
logConsumer(
`Read ${value.length} (${bytesReceived}). Current bytes = ${value}`,
);
result += value;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
æå¾ã«ããã¿ã³ãã¯ãªãã¯ãããå ´åã«ã¹ããªã¼ã ãåãæ¶ãããå¯è½æ§ã®ãããã³ãã©ã¼ã追å ãã¾ãï¼ãã¿ã³ã®ããã®ä»ã® HTML ã¨ã³ã¼ãã¯ç¤ºãã¾ããï¼ã
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
çµæ
åºç¤ã®ãã¤ããã«ã½ã¼ã¹ï¼å·¦ï¼ã¨ã³ã³ã·ã¥ã¼ãï¼å³ï¼ããã®ãã°åºåãä¸è¨ã«ç¤ºãã¾ãã
ããã§ãã£ã³ã¯ã®å¹
ã¯æå¤§ã§ã 20 ãã¤ãã«ãªã£ããã¨ã«æ³¨æãã¦ãã ãããããã¯ãåºç¤ã®ãã¤ãã½ã¼ã¹ (autoAllocateChunkSize
) ã§æå®ããèªåå²ãå½ã¦ãããã¡ã®ãµã¤ãºã ããã§ãã ãããã¯ãã¼ãã³ãã¼ç§»è²ã¨ãã¦è¡ããã¾ãã
å®å ¨ãæãããã«ãèªåãããã¡ã¼å²ãå½ã¦ã«å¯¾å¿ãã¦ããªããã¤ãã½ã¼ã¹ã®æ¢å®ã®ãªã¼ãã¼ã使ç¨ãããã¨ãã§ãã¾ãã
class MockUnderlyingFileHandle {
constructor() {
this.maxdata = 100; // "file size"
this.maxReadChunk = 25; // "max read chunk size"
this.minReadChunk = 13; // "min read chunk size"
this.filedata = this.randomByteArray(this.maxdata);
this.position = 0;
}
// Read data from "file" at position/length into specified buffer offset
read(buffer, offset, length, position) {
// ãããã¹ã解決ããããã«ä½¿ç¨ãããªãã¸ã§ã¯ã
const resultobj = {};
resultobj["buffer"] = buffer;
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (position >= this.maxdata) {
//out of data
resolve(resultobj);
return;
}
// ä¹±æ°ã®ãã¤ãåãè¿ããã¡ã¤ã«èªã¿è¾¼ã¿ã®ã·ãã¥ã¬ã¼ã·ã§ã³
// ãªã¯ã¨ã¹ããããæå°ãã¤ãæ°ã¨è¿ããã¨ãã§ããã©ã³ãã ãã¤ãæ°ãèªã¿åã
let readLength =
Math.floor(
Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
) + this.minReadChunk;
readLength = length > readLength ? readLength : length;
// æä¾ããããããã¡ã¼ã«ã©ã³ãã ãã¼ã¿ãèªã¿è¾¼ã
const myview = new Uint8Array(buffer, offset, readLength);
// æå®ããé·ãã®ãã¼ã¿ãæ¸ã
for (let i = 0; i < readLength; i++) {
myview[i] = this.filedata[position + i];
resultobj["bytesRead"] = i + 1;
if (position + i + 1 >= this.maxdata) {
break;
}
}
// ãã¼ã¿ã®é
ãèªã¿è¾¼ã¿ãã¨ãã¥ã¬ã¼ã
setTimeout(() => {
resolve(resultobj);
}, 1000);
});
}
// ããã¼ã® close 颿°
close() {
return;
}
// ã©ã³ãã ãªæåã®æååãè¿ã
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
// ã©ã³ãã 㪠Uint8Array ã®ãã¤ãåãè¿ã
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
.input {
float: left;
width: 50%;
}
.output {
float: right;
width: 50%;
overflow-wrap: break-word;
}
button {
display: block;
}
<button>Cancel stream</button>
<div class="input">
<h2>Underlying source</h2>
<ul></ul>
</div>
<div class="output">
<h2>Consumer</h2>
<ul></ul>
</div>
// ãªã¹ããæ®µè½ããã¿ã³ã¸ã®åç
§ãæ ¼ç´
const list1 = document.querySelector(".input ul");
const list2 = document.querySelector(".output ul");
const button = document.querySelector("button");
// æçµçµæãæ ¼ç´ããããã«ç©ºæååã使
let result = "";
// åºç¤ãããã¼ã¿ããã°åºåãã颿°
function logSource(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list1.appendChild(listItem);
}
// ã³ã³ã·ã¥ã¼ãã¼ã®ãã¼ã¿ããã°åºåãã颿°
function logConsumer(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list2.appendChild(listItem);
}
ãããããã®å ´åãã³ã³ããã¼ã©ã¼ã¯ byobRequest
ãåºç¤ã«æ¸ãè¾¼ããã¨ã¯ã§ãã¾ããã ãã®ä»£ããã«ãåºç¤ã½ã¼ã¹ã¯ãã¼ã¿ãã¨ã³ãã¥ã¼ããªããã°ãªãã¾ããã ãªãããã®ã±ã¼ã¹ã«å¯¾å¿ããããã«ã pull()
ã§ byobRequest
ãåå¨ãããã©ããã調ã¹ãå¿
è¦ãããã¾ãã
const stream = makeReadableByteFileStream("dummy file.txt");
const DEFAULT_CHUNK_SIZE = 40;
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
if (controller.byobRequest) {
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
} else {
// No BYOBRequest so enqueue data to stream
// NOTE, this branch would only execute for a default reader if autoAllocateChunkSize is not defined.
const mynewBuffer = new Uint8Array(DEFAULT_CHUNK_SIZE);
const { bytesRead, buffer } = await fileHandle.read(
mynewBuffer.buffer,
mynewBuffer.byteOffset,
mynewBuffer.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.enqueue(mynewBuffer);
logSource(
`pull() with no byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.enqueue(mynewBuffer);
logSource(`pull() with no byobRequest. enqueue() ${bytesRead} bytes`);
}
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
const reader = stream.getReader();
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
bytesReceived += value.length;
logConsumer(`Read ${bytesReceived} bytes so far. Current bytes = ${value}`);
result += value;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
çµæ
åºç¤ã¨ãªããã«ã½ã¼ã¹ï¼å·¦ï¼ã¨ã³ã³ã·ã¥ã¼ãï¼å³ï¼ããã®ãã°åºåãä¸è¨ã«ç¤ºãã¾ãã åºç¤ã¨ãªãã½ã¼ã¹ã®å´ã¯ããã¼ã¿ãã¼ããã¤ãç§»è²ã§ã¯ãªããã¨ã³ãã¥ã¼ããããã¨ã示ãã¦ãããã¨ã«æ³¨æãã¦ãã ããã
é¢é£æ å ±RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4