@@ -581,9 +581,11 @@ type serverConn struct {
581
581
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
582
582
curClientStreams uint32 // number of open streams initiated by the client
583
583
curPushedStreams uint32 // number of open streams initiated by server push
584
+
curHandlers uint32 // number of running handler goroutines
584
585
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
585
586
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
586
587
streams map[uint32]*stream
588
+
unstartedHandlers []unstartedHandler
587
589
initialStreamSendWindowSize int32
588
590
maxFrameSize int32
589
591
peerMaxHeaderListSize uint32 // zero means unknown (default)
@@ -981,6 +983,8 @@ func (sc *serverConn) serve() {
981
983
return
982
984
case gracefulShutdownMsg:
983
985
sc.startGracefulShutdownInternal()
986
+
case handlerDoneMsg:
987
+
sc.handlerDone()
984
988
default:
985
989
panic("unknown timer")
986
990
}
@@ -1020,6 +1024,7 @@ var (
1020
1024
idleTimerMsg = new(serverMessage)
1021
1025
shutdownTimerMsg = new(serverMessage)
1022
1026
gracefulShutdownMsg = new(serverMessage)
1027
+
handlerDoneMsg = new(serverMessage)
1023
1028
)
1024
1029
1025
1030
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) }
@@ -2017,8 +2022,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
2017
2022
st.readDeadline = time.AfterFunc(sc.hs.ReadTimeout, st.onReadTimeout)
2018
2023
}
2019
2024
2020
-
go sc.runHandler(rw, req, handler)
2021
-
return nil
2025
+
return sc.scheduleHandler(id, rw, req, handler)
2022
2026
}
2023
2027
2024
2028
func (sc *serverConn) upgradeRequest(req *http.Request) {
@@ -2038,6 +2042,10 @@ func (sc *serverConn) upgradeRequest(req *http.Request) {
2038
2042
sc.conn.SetReadDeadline(time.Time{})
2039
2043
}
2040
2044
2045
+
// This is the first request on the connection,
2046
+
// so start the handler directly rather than going
2047
+
// through scheduleHandler.
2048
+
sc.curHandlers++
2041
2049
go sc.runHandler(rw, req, sc.handler.ServeHTTP)
2042
2050
}
2043
2051
@@ -2278,8 +2286,62 @@ func (sc *serverConn) newResponseWriter(st *stream, req *http.Request) *response
2278
2286
return &responseWriter{rws: rws}
2279
2287
}
2280
2288
2289
+
type unstartedHandler struct {
2290
+
streamID uint32
2291
+
rw *responseWriter
2292
+
req *http.Request
2293
+
handler func(http.ResponseWriter, *http.Request)
2294
+
}
2295
+
2296
+
// scheduleHandler starts a handler goroutine,
2297
+
// or schedules one to start as soon as an existing handler finishes.
2298
+
func (sc *serverConn) scheduleHandler(streamID uint32, rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) error {
2299
+
sc.serveG.check()
2300
+
maxHandlers := sc.advMaxStreams
2301
+
if sc.curHandlers < maxHandlers {
2302
+
sc.curHandlers++
2303
+
go sc.runHandler(rw, req, handler)
2304
+
return nil
2305
+
}
2306
+
if len(sc.unstartedHandlers) > int(4*sc.advMaxStreams) {
2307
+
return sc.countError("too_many_early_resets", ConnectionError(ErrCodeEnhanceYourCalm))
2308
+
}
2309
+
sc.unstartedHandlers = append(sc.unstartedHandlers, unstartedHandler{
2310
+
streamID: streamID,
2311
+
rw: rw,
2312
+
req: req,
2313
+
handler: handler,
2314
+
})
2315
+
return nil
2316
+
}
2317
+
2318
+
func (sc *serverConn) handlerDone() {
2319
+
sc.serveG.check()
2320
+
sc.curHandlers--
2321
+
i := 0
2322
+
maxHandlers := sc.advMaxStreams
2323
+
for ; i < len(sc.unstartedHandlers); i++ {
2324
+
u := sc.unstartedHandlers[i]
2325
+
if sc.streams[u.streamID] == nil {
2326
+
// This stream was reset before its goroutine had a chance to start.
2327
+
continue
2328
+
}
2329
+
if sc.curHandlers >= maxHandlers {
2330
+
break
2331
+
}
2332
+
sc.curHandlers++
2333
+
go sc.runHandler(u.rw, u.req, u.handler)
2334
+
sc.unstartedHandlers[i] = unstartedHandler{} // don't retain references
2335
+
}
2336
+
sc.unstartedHandlers = sc.unstartedHandlers[i:]
2337
+
if len(sc.unstartedHandlers) == 0 {
2338
+
sc.unstartedHandlers = nil
2339
+
}
2340
+
}
2341
+
2281
2342
// Run on its own goroutine.
2282
2343
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) {
2344
+
defer sc.sendServeMsg(handlerDoneMsg)
2283
2345
didPanic := true
2284
2346
defer func() {
2285
2347
rw.rws.stream.cancelCtx()
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4