@@ -11,7 +11,6 @@ import (
11
11
"io"
12
12
"net"
13
13
"net/rpc"
14
-
"runtime"
15
14
"sync"
16
15
"time"
17
16
@@ -105,13 +104,16 @@ const defaultOpTimeout = 10 * time.Second
105
104
// ErrClosed is the error returned when an already-closed connection is re-used.
106
105
var ErrClosed = fmt.Errorf("use of closed connection")
107
106
107
+
// ErrNotFound is not really an error, since timeouts race responses
108
+
var ErrNotFound = fmt.Errorf("connection removed")
109
+
108
110
// Conn represents an open keyless connection.
109
111
type Conn struct {
110
112
// In order to read, acquire readMtx; in order to write, acquire writeMtx
111
113
conn net.Conn
112
114
// In order to read, acquire mapMtx.RLock(); in order to write, acquire
113
115
// mapMtx.Lock().
114
-
listeners map[uint32]chan *protocol.Operation
116
+
listeners map[uint32]chan *result
115
117
// In order to modify, acquire mapMtx.Lock().
116
118
nextID uint32
117
119
@@ -120,18 +122,23 @@ type Conn struct {
120
122
// To lock up the connection, always acquire in the following order to avoid
121
123
// deadlock: writeMtx, mapMtx (don't acquire readMtx).
122
124
readMtx, writeMtx sync.Mutex
123
-
mapMtx sync.RWMutex
125
+
mapMtx sync.Mutex
124
126
125
127
// In order to read, acquire any mutex in any mode (read or write). In order
126
128
// to modify, acquire all three.
127
129
closed bool
128
130
}
129
131
132
+
type result struct {
133
+
err error
134
+
op *protocol.Operation
135
+
}
136
+
130
137
// NewConnTimeout constructs a new Conn with the given operation timeout.
131
138
func NewConnTimeout(inner net.Conn, opTimeout time.Duration) *Conn {
132
139
return &Conn{
133
140
conn: inner,
134
-
listeners: make(map[uint32]chan *protocol.Operation),
141
+
listeners: make(map[uint32]chan *result),
135
142
opTimeout: opTimeout,
136
143
}
137
144
}
@@ -177,41 +184,42 @@ func (c *Conn) DoRead() error {
177
184
if err != nil {
178
185
return err
179
186
}
187
+
l, err := c.extractChannel(pkt.ID)
188
+
if err != nil {
189
+
// The timeout fired, our connection was removed.
190
+
// Not a problem!
191
+
if err == ErrNotFound {
192
+
return nil
193
+
}
194
+
// Other errors close the whole connection
195
+
return err
196
+
}
180
197
181
-
// Acquire the map mutex until we're done with the map.
182
-
c.mapMtx.RLock()
198
+
l <- &result{op: &pkt.Operation}
199
+
return nil
200
+
}
201
+
202
+
func (c *Conn) extractChannel(id uint32) (chan *result, error) {
203
+
c.mapMtx.Lock()
204
+
defer c.mapMtx.Unlock()
183
205
if c.closed {
184
-
// it was closed in the time that we didn't have a lock held
185
-
c.mapMtx.RUnlock()
186
-
return ErrClosed
206
+
return nil, ErrClosed
187
207
}
188
-
189
-
// NOTE: Regardless of what happens, it's the writer's responsibility - not
190
-
// ours - to delete their channel after they receive their response. If we
191
-
// were to do that ourselves, we could introduce a race condition:
192
-
// - Writer times out, enters timeout case in select block
193
-
// - Reader locks the map mutex, finds their channel, sends them the response,
194
-
// and deletes the channel from the map.
195
-
// - Another writer comes, gets the same ID as the currently sleeping waiter,
196
-
// and writes their channel into the map.
197
-
// - The waiter that timed out wakes back up and deletes the wrong channel!
198
-
199
-
l, ok := c.listeners[pkt.ID]
200
-
c.mapMtx.RUnlock()
208
+
ret, ok := c.listeners[id]
209
+
delete(c.listeners, id)
201
210
if !ok {
202
-
// the call to DoOperation hit its timeout and cleared the map and returned;
203
-
// the error was delivered from that call, so no point in delivering it here
204
-
// too
205
-
return nil
211
+
return nil, ErrNotFound
206
212
}
213
+
return ret, nil
214
+
}
207
215
208
-
// In practice, the Go scheduler seems to have a tough time efficiently
209
-
// scheduling client goroutines. Empirically, adding this line speeds things
210
-
// up significantly.
211
-
runtime.Gosched()
212
-
213
-
l <- &pkt.Operation
214
-
return nil
216
+
func (c *Conn) timeoutRequest(id uint32, timeout time.Duration) {
217
+
<-time.After(timeout)
218
+
place, err := c.extractChannel(id)
219
+
if err != nil {
220
+
return // the process finished successfully first
221
+
}
222
+
place <- &result{err: fmt.Errorf("operation timed out")}
215
223
}
216
224
217
225
// DoOperation executes an entire keyless operation, returning its result.
@@ -224,7 +232,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
224
232
// time out, but a reader finds this channel before we have a chance to delete
225
233
// it from the map, the reader doesn't block forever sending us a value that
226
234
// we will never receive.
227
-
response := make(chan *protocol.Operation, 1)
235
+
response := make(chan *result, 1)
228
236
229
237
// Acquire the map mutex and only release it once we're done with the map.
230
238
c.mapMtx.Lock()
@@ -236,6 +244,7 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
236
244
c.nextID++
237
245
if _, ok := c.listeners[id]; ok {
238
246
c.mapMtx.Unlock()
247
+
c.Close()
239
248
// TODO: If this becomes an issue in practice, we could consider randomly
240
249
// generating IDs and spinning until we find an available one (the map
241
250
// acts as a record of all IDs currently in use).
@@ -270,23 +279,16 @@ func (c *Conn) DoOperation(ctx context.Context, op protocol.Operation) (*protoco
270
279
// of writing to the connection (which could have taken a while if the
271
280
// connection was backed up).
272
281
left := end.Sub(time.Now())
273
-
select {
274
-
case op := <-response:
275
-
waitingSpan.Finish()
276
-
c.mapMtx.Lock()
277
-
delete(c.listeners, id)
278
-
c.mapMtx.Unlock()
279
-
if op == nil {
280
-
return nil, ErrClosed
281
-
}
282
-
return op, nil
283
-
case <-time.After(left):
284
-
waitingSpan.Finish()
285
-
c.mapMtx.Lock()
286
-
delete(c.listeners, id)
287
-
c.mapMtx.Unlock()
288
-
return nil, fmt.Errorf("operation timed out")
282
+
go c.timeoutRequest(id, left)
283
+
res := <-response
284
+
waitingSpan.Finish()
285
+
if res == nil {
286
+
return nil, ErrClosed
287
+
}
288
+
if res.err != nil {
289
+
return nil, res.err
289
290
}
291
+
return res.op, nil
290
292
}
291
293
292
294
// Ping sends a ping message over the connection and waits for a corresponding
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4