A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://github.com/cloudflare/gokeyless/commit/1c05176bff726e565e7c57748ed9966907ff606a below:

Revamp connections to avoid runtime calls (#282) · cloudflare/gokeyless@1c05176 · GitHub

@@ -11,7 +11,6 @@ import (

11 11

"io"

12 12

"net"

13 13

"net/rpc"

14 -

"runtime"

15 14

"sync"

16 15

"time"

17 16

@@ -105,13 +104,16 @@ const defaultOpTimeout = 10 * time.Second

105 104

// ErrClosed is the error returned when an already-closed connection is re-used.

106 105

var ErrClosed = fmt.Errorf("use of closed connection")

107 106 107 +

// ErrNotFound is not really an error, since timeouts race responses

108 +

var ErrNotFound = fmt.Errorf("connection removed")

109 + 108 110

// Conn represents an open keyless connection.

109 111

type Conn struct {

110 112

// In order to read, acquire readMtx; in order to write, acquire writeMtx

111 113

conn net.Conn

112 114

// In order to read, acquire mapMtx.RLock(); in order to write, acquire

113 115

// mapMtx.Lock().

114 -

listeners map[uint32]chan *protocol.Operation

116 +

listeners map[uint32]chan *result

115 117

// In order to modify, acquire mapMtx.Lock().

116 118

nextID uint32

117 119

@@ -120,18 +122,23 @@ type Conn struct {

120 122

// To lock up the connection, always acquire in the following order to avoid

121 123

// deadlock: writeMtx, mapMtx (don't acquire readMtx).

122 124

readMtx, writeMtx sync.Mutex

123 -

mapMtx sync.RWMutex

125 +

mapMtx sync.Mutex

124 126 125 127

// In order to read, acquire any mutex in any mode (read or write). In order

126 128

// to modify, acquire all three.

127 129

closed bool

128 130

}

129 131 132 +

type result struct {

133 +

err error

134 +

op *protocol.Operation

135 +

}

136 + 130 137

// NewConnTimeout constructs a new Conn with the given operation timeout.

131 138

func NewConnTimeout(inner net.Conn, opTimeout time.Duration) *Conn {

132 139

return &Conn{

133 140

conn: inner,

134 -

listeners: make(map[uint32]chan *protocol.Operation),

141 +

listeners: make(map[uint32]chan *result),

135 142

opTimeout: opTimeout,

136 143

}

137 144

}

@@ -177,41 +184,42 @@ func (c *Conn) DoRead() error {

177 184

if err != nil {

178 185

return err

179 186

}

187 +

l, err := c.extractChannel(pkt.ID)

188 +

if err != nil {

189 +

// The timeout fired, our connection was removed.

190 +

// Not a problem!

191 +

if err == ErrNotFound {

192 +

return nil

193 +

}

194 +

// Other errors close the whole connection

195 +

return err

196 +

}

180 197 181 -

// Acquire the map mutex until we're done with the map.

182 -

c.mapMtx.RLock()

198 +

l <- &result{op: &pkt.Operation}

199 +

return nil

200 +

}

201 + 202 +

func (c *Conn) extractChannel(id uint32) (chan *result, error) {

203 +

c.mapMtx.Lock()

204 +

defer c.mapMtx.Unlock()

183 205

if c.closed {

184 -

// it was closed in the time that we didn't have a lock held

185 -

c.mapMtx.RUnlock()

186 -

return ErrClosed

206 +

return nil, ErrClosed

187 207

}

188 - 189 -

// NOTE: Regardless of what happens, it's the writer's responsibility - not

190 -

// ours - to delete their channel after they receive their response. If we

191 -

// were to do that ourselves, we could introduce a race condition:

192 -

// - Writer times out, enters timeout case in select block

193 -

// - Reader locks the map mutex, finds their channel, sends them the response,

194 -

// and deletes the channel from the map.

195 -

// - Another writer comes, gets the same ID as the currently sleeping waiter,

196 -

// and writes their channel into the map.

197 -

// - The waiter that timed out wakes back up and deletes the wrong channel!

198 - 199 -

l, ok := c.listeners[pkt.ID]

200 -

c.mapMtx.RUnlock()

208 +

ret, ok := c.listeners[id]

209 +

delete(c.listeners, id)

201 210

if !ok {

202 -

// the call to DoOperation hit its timeout and cleared the map and returned;

203 -

// the error was delivered from that call, so no point in delivering it here

204 -

// too

205 -

return nil

211 +

return nil, ErrNotFound

206 212

}

213 +

return ret, nil

214 +

}

207 215 208 -

// In practice, the Go scheduler seems to have a tough time efficiently

209 -

// scheduling client goroutines. Empirically, adding this line speeds things

210 -

// up significantly.

211 -

runtime.Gosched()

212 - 213 -

l <- &pkt.Operation

214 -

return nil

216 +

func (c *Conn) timeoutRequest(id uint32, timeout time.Duration) {

217 +

<-time.After(timeout)

218 +

place, err := c.extractChannel(id)

219 +

if err != nil {

220 +

return // the process finished successfully first

221 +

}

222 +

place <- &result{err: fmt.Errorf("operation timed out")}

215 223

}

216 224 217 225

// DoOperation executes an entire keyless operation, returning its result.

@@ -224,7 +232,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco

224 232

// time out, but a reader finds this channel before we have a chance to delete

225 233

// it from the map, the reader doesn't block forever sending us a value that

226 234

// we will never receive.

227 -

response := make(chan *protocol.Operation, 1)

235 +

response := make(chan *result, 1)

228 236 229 237

// Acquire the map mutex and only release it once we're done with the map.

230 238

c.mapMtx.Lock()

@@ -236,6 +244,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco

236 244

c.nextID++

237 245

if _, ok := c.listeners[id]; ok {

238 246

c.mapMtx.Unlock()

247 +

c.Close()

239 248

// TODO: If this becomes an issue in practice, we could consider randomly

240 249

// generating IDs and spinning until we find an available one (the map

241 250

// acts as a record of all IDs currently in use).

@@ -270,23 +279,16 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco

270 279

// of writing to the connection (which could have taken a while if the

271 280

// connection was backed up).

272 281

left := end.Sub(time.Now())

273 -

select {

274 -

case op := <-response:

275 -

waitingSpan.Finish()

276 -

c.mapMtx.Lock()

277 -

delete(c.listeners, id)

278 -

c.mapMtx.Unlock()

279 -

if op == nil {

280 -

return nil, ErrClosed

281 -

}

282 -

return op, nil

283 -

case <-time.After(left):

284 -

waitingSpan.Finish()

285 -

c.mapMtx.Lock()

286 -

delete(c.listeners, id)

287 -

c.mapMtx.Unlock()

288 -

return nil, fmt.Errorf("operation timed out")

282 +

go c.timeoutRequest(id, left)

283 +

res := <-response

284 +

waitingSpan.Finish()

285 +

if res == nil {

286 +

return nil, ErrClosed

287 +

}

288 +

if res.err != nil {

289 +

return nil, res.err

289 290

}

291 +

return res.op, nil

290 292

}

291 293 292 294

// Ping sends a ping message over the connection and waits for a corresponding


RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4