A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from http://mail.python.org/pipermail/python-checkins/2015-January/134233.html below:

[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)

[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)victor.stinner python-checkins at python.org
Fri Jan 30 00:09:05 CET 2015
https://hg.python.org/cpython/rev/0701ffaa89f9
changeset:   94380:0701ffaa89f9
parent:      94378:7c9a42cbfff0
parent:      94379:a5efd5021ca1
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Fri Jan 30 00:05:36 2015 +0100
summary:
  Merge 3.4 (asyncio)

files:
  Lib/asyncio/base_subprocess.py           |  108 ++++++----
  Lib/asyncio/subprocess.py                |   40 +---
  Lib/asyncio/unix_events.py               |   15 +-
  Lib/asyncio/windows_events.py            |    7 +-
  Lib/test/test_asyncio/test_events.py     |   35 +-
  Lib/test/test_asyncio/test_subprocess.py |   65 ++++++
  6 files changed, 166 insertions(+), 104 deletions(-)


diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -3,6 +3,7 @@
 import sys
 import warnings
 
+from . import futures
 from . import protocols
 from . import transports
 from .coroutines import coroutine
@@ -13,27 +14,32 @@
 
     def __init__(self, loop, protocol, args, shell,
                  stdin, stdout, stderr, bufsize,
-                 extra=None, **kwargs):
+                 waiter=None, extra=None, **kwargs):
         super().__init__(extra)
         self._closed = False
         self._protocol = protocol
         self._loop = loop
+        self._proc = None
         self._pid = None
+        self._returncode = None
+        self._exit_waiters = []
+        self._pending_calls = collections.deque()
+        self._pipes = {}
+        self._finished = False
 
-        self._pipes = {}
         if stdin == subprocess.PIPE:
             self._pipes[0] = None
         if stdout == subprocess.PIPE:
             self._pipes[1] = None
         if stderr == subprocess.PIPE:
             self._pipes[2] = None
-        self._pending_calls = collections.deque()
-        self._finished = False
-        self._returncode = None
+
+        # Create the child process: set the _proc attribute
         self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
                     stderr=stderr, bufsize=bufsize, **kwargs)
         self._pid = self._proc.pid
         self._extra['subprocess'] = self._proc
+
         if self._loop.get_debug():
             if isinstance(args, (bytes, str)):
                 program = args
@@ -42,6 +48,8 @@
             logger.debug('process %r created: pid %s',
                          program, self._pid)
 
+        self._loop.create_task(self._connect_pipes(waiter))
+
     def __repr__(self):
         info = [self.__class__.__name__]
         if self._closed:
@@ -77,12 +85,23 @@
 
     def close(self):
         self._closed = True
+
         for proto in self._pipes.values():
             if proto is None:
                 continue
             proto.pipe.close()
-        if self._returncode is None:
-            self.terminate()
+
+        if self._proc is not None and self._returncode is None:
+            if self._loop.get_debug():
+                logger.warning('Close running child process: kill %r', self)
+
+            try:
+                self._proc.kill()
+            except ProcessLookupError:
+                pass
+
+            # Don't clear the _proc reference yet because _post_init() may
+            # still run
 
     # On Python 3.3 and older, objects with a destructor part of a reference
     # cycle are never destroyed. It's not more the case on Python 3.4 thanks
@@ -105,59 +124,42 @@
         else:
             return None
 
+    def _check_proc(self):
+        if self._closed:
+            raise ValueError("operation on closed transport")
+        if self._proc is None:
+            raise ProcessLookupError()
+
     def send_signal(self, signal):
+        self._check_proc()
         self._proc.send_signal(signal)
 
     def terminate(self):
+        self._check_proc()
         self._proc.terminate()
 
     def kill(self):
+        self._check_proc()
         self._proc.kill()
 
-    def _kill_wait(self):
-        """Close pipes, kill the subprocess and read its return status.
-
-        Function called when an exception is raised during the creation
-        of a subprocess.
-        """
-        self._closed = True
-        if self._loop.get_debug():
-            logger.warning('Exception during subprocess creation, '
-                           'kill the subprocess %r',
-                           self,
-                           exc_info=True)
-
-        proc = self._proc
-        if proc.stdout:
-            proc.stdout.close()
-        if proc.stderr:
-            proc.stderr.close()
-        if proc.stdin:
-            proc.stdin.close()
-
-        try:
-            proc.kill()
-        except ProcessLookupError:
-            pass
-        self._returncode = proc.wait()
-
-        self.close()
-
     @coroutine
-    def _post_init(self):
+    def _connect_pipes(self, waiter):
         try:
             proc = self._proc
             loop = self._loop
+
             if proc.stdin is not None:
                 _, pipe = yield from loop.connect_write_pipe(
                     lambda: WriteSubprocessPipeProto(self, 0),
                     proc.stdin)
                 self._pipes[0] = pipe
+
             if proc.stdout is not None:
                 _, pipe = yield from loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 1),
                     proc.stdout)
                 self._pipes[1] = pipe
+
             if proc.stderr is not None:
                 _, pipe = yield from loop.connect_read_pipe(
                     lambda: ReadSubprocessPipeProto(self, 2),
@@ -166,13 +168,16 @@
 
             assert self._pending_calls is not None
 
-            self._loop.call_soon(self._protocol.connection_made, self)
+            loop.call_soon(self._protocol.connection_made, self)
             for callback, data in self._pending_calls:
-                self._loop.call_soon(callback, *data)
+                loop.call_soon(callback, *data)
             self._pending_calls = None
-        except:
-            self._kill_wait()
-            raise
+        except Exception as exc:
+            if waiter is not None and not waiter.cancelled():
+                waiter.set_exception(exc)
+        else:
+            if waiter is not None and not waiter.cancelled():
+                waiter.set_result(None)
 
     def _call(self, cb, *data):
         if self._pending_calls is not None:
@@ -197,6 +202,23 @@
         self._call(self._protocol.process_exited)
         self._try_finish()
 
+        # wake up futures waiting for wait()
+        for waiter in self._exit_waiters:
+            if not waiter.cancelled():
+                waiter.set_result(returncode)
+        self._exit_waiters = None
+
+    def wait(self):
+        """Wait until the process exit and return the process return code.
+
+        This method is a coroutine."""
+        if self._returncode is not None:
+            return self._returncode
+
+        waiter = futures.Future(loop=self._loop)
+        self._exit_waiters.append(waiter)
+        return (yield from waiter)
+
     def _try_finish(self):
         assert not self._finished
         if self._returncode is None:
@@ -210,9 +232,9 @@
         try:
             self._protocol.connection_lost(exc)
         finally:
+            self._loop = None
             self._proc = None
             self._protocol = None
-            self._loop = None
 
 
 class WriteSubprocessPipeProto(protocols.BaseProtocol):
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -25,8 +25,6 @@
         super().__init__(loop=loop)
         self._limit = limit
         self.stdin = self.stdout = self.stderr = None
-        self.waiter = futures.Future(loop=loop)
-        self._waiters = collections.deque()
         self._transport = None
 
     def __repr__(self):
@@ -61,9 +59,6 @@
                                               reader=None,
                                               loop=self._loop)
 
-        if not self.waiter.cancelled():
-            self.waiter.set_result(None)
-
     def pipe_data_received(self, fd, data):
         if fd == 1:
             reader = self.stdout
@@ -94,16 +89,9 @@
                 reader.set_exception(exc)
 
     def process_exited(self):
-        returncode = self._transport.get_returncode()
         self._transport.close()
         self._transport = None
 
-        # wake up futures waiting for wait()
-        while self._waiters:
-            waiter = self._waiters.popleft()
-            if not waiter.cancelled():
-                waiter.set_result(returncode)
-
 
 class Process:
     def __init__(self, transport, protocol, loop):
@@ -124,30 +112,18 @@
 
     @coroutine
     def wait(self):
-        """Wait until the process exit and return the process return code."""
-        returncode = self._transport.get_returncode()
-        if returncode is not None:
-            return returncode
+        """Wait until the process exit and return the process return code.
 
-        waiter = futures.Future(loop=self._loop)
-        self._protocol._waiters.append(waiter)
-        yield from waiter
-        return waiter.result()
-
-    def _check_alive(self):
-        if self._transport.get_returncode() is not None:
-            raise ProcessLookupError()
+        This method is a coroutine."""
+        return (yield from self._transport.wait())
 
     def send_signal(self, signal):
-        self._check_alive()
         self._transport.send_signal(signal)
 
     def terminate(self):
-        self._check_alive()
         self._transport.terminate()
 
     def kill(self):
-        self._check_alive()
         self._transport.kill()
 
     @coroutine
@@ -221,11 +197,6 @@
                                             protocol_factory,
                                             cmd, stdin=stdin, stdout=stdout,
                                             stderr=stderr, **kwds)
-    try:
-        yield from protocol.waiter
-    except:
-        transport._kill_wait()
-        raise
     return Process(transport, protocol, loop)
 
 @coroutine
@@ -241,9 +212,4 @@
                                             program, *args,
                                             stdin=stdin, stdout=stdout,
                                             stderr=stderr, **kwds)
-    try:
-        yield from protocol.waiter
-    except:
-        transport._kill_wait()
-        raise
     return Process(transport, protocol, loop)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -16,6 +16,7 @@
 from . import constants
 from . import coroutines
 from . import events
+from . import futures
 from . import selector_events
 from . import selectors
 from . import transports
@@ -175,16 +176,20 @@
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
+            waiter = futures.Future(loop=self)
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                               stdin, stdout, stderr, bufsize,
-                                              extra=extra, **kwargs)
+                                              waiter=waiter, extra=extra,
+                                              **kwargs)
+
+            watcher.add_child_handler(transp.get_pid(),
+                                      self._child_watcher_callback, transp)
             try:
-                yield from transp._post_init()
+                yield from waiter
             except:
                 transp.close()
+                yield from transp.wait()
                 raise
-            watcher.add_child_handler(transp.get_pid(),
-                                      self._child_watcher_callback, transp)
 
         return transp
 
@@ -774,7 +779,7 @@
         pass
 
     def add_child_handler(self, pid, callback, *args):
-        self._callbacks[pid] = callback, args
+        self._callbacks[pid] = (callback, args)
 
         # Prevent a race condition in case the child is already terminated.
         self._do_waitpid(pid)
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -366,13 +366,16 @@
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
+        waiter = futures.Future(loop=self)
         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
                                              stdin, stdout, stderr, bufsize,
-                                             extra=extra, **kwargs)
+                                             waiter=waiter, extra=extra,
+                                             **kwargs)
         try:
-            yield from transp._post_init()
+            yield from waiter
         except:
             transp.close()
+            yield from transp.wait()
             raise
 
         return transp
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1551,9 +1551,10 @@
         stdin = transp.get_pipe_transport(0)
         stdin.write(b'Python The Winner')
         self.loop.run_until_complete(proto.got_data[1].wait())
-        transp.close()
+        with test_utils.disable_logger():
+            transp.close()
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
         self.assertEqual(b'Python The Winner', proto.data[1])
 
     def test_subprocess_interactive(self):
@@ -1567,21 +1568,20 @@
         self.loop.run_until_complete(proto.connected)
         self.assertEqual('CONNECTED', proto.state)
 
-        try:
-            stdin = transp.get_pipe_transport(0)
-            stdin.write(b'Python ')
-            self.loop.run_until_complete(proto.got_data[1].wait())
-            proto.got_data[1].clear()
-            self.assertEqual(b'Python ', proto.data[1])
-
-            stdin.write(b'The Winner')
-            self.loop.run_until_complete(proto.got_data[1].wait())
-            self.assertEqual(b'Python The Winner', proto.data[1])
-        finally:
+        stdin = transp.get_pipe_transport(0)
+        stdin.write(b'Python ')
+        self.loop.run_until_complete(proto.got_data[1].wait())
+        proto.got_data[1].clear()
+        self.assertEqual(b'Python ', proto.data[1])
+
+        stdin.write(b'The Winner')
+        self.loop.run_until_complete(proto.got_data[1].wait())
+        self.assertEqual(b'Python The Winner', proto.data[1])
+
+        with test_utils.disable_logger():
             transp.close()
-
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
 
     def test_subprocess_shell(self):
         connect = self.loop.subprocess_shell(
@@ -1739,9 +1739,10 @@
             # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
             # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
             self.assertEqual(b'ERR:OSError', proto.data[2])
-        transp.close()
+        with test_utils.disable_logger():
+            transp.close()
         self.loop.run_until_complete(proto.completed)
-        self.check_terminated(proto.returncode)
+        self.check_killed(proto.returncode)
 
     def test_subprocess_wait_no_same_group(self):
         # start the new process in a new session
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -4,6 +4,7 @@
 from unittest import mock
 
 import asyncio
+from asyncio import base_subprocess
 from asyncio import subprocess
 from asyncio import test_utils
 try:
@@ -23,6 +24,70 @@
               'data = sys.stdin.buffer.read()',
               'sys.stdout.buffer.write(data)'))]
 
+class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
+    def _start(self, *args, **kwargs):
+        self._proc = mock.Mock()
+        self._proc.stdin = None
+        self._proc.stdout = None
+        self._proc.stderr = None
+
+
+class SubprocessTransportTests(test_utils.TestCase):
+    def setUp(self):
+        self.loop = self.new_test_loop()
+        self.set_event_loop(self.loop)
+
+
+    def create_transport(self, waiter=None):
+        protocol = mock.Mock()
+        protocol.connection_made._is_coroutine = False
+        protocol.process_exited._is_coroutine = False
+        transport = TestSubprocessTransport(
+                        self.loop, protocol, ['test'], False,
+                        None, None, None, 0, waiter=waiter)
+        return (transport, protocol)
+
+    def test_close(self):
+        waiter = asyncio.Future(loop=self.loop)
+        transport, protocol = self.create_transport(waiter)
+        transport._process_exited(0)
+        transport.close()
+
+        # The loop didn't run yet
+        self.assertFalse(protocol.connection_made.called)
+
+        # methods must raise ProcessLookupError if the transport was closed
+        self.assertRaises(ValueError, transport.send_signal, signal.SIGTERM)
+        self.assertRaises(ValueError, transport.terminate)
+        self.assertRaises(ValueError, transport.kill)
+
+        self.loop.run_until_complete(waiter)
+
+    def test_proc_exited(self):
+        waiter = asyncio.Future(loop=self.loop)
+        transport, protocol = self.create_transport(waiter)
+        transport._process_exited(6)
+        self.loop.run_until_complete(waiter)
+
+        self.assertEqual(transport.get_returncode(), 6)
+
+        self.assertTrue(protocol.connection_made.called)
+        self.assertTrue(protocol.process_exited.called)
+        self.assertTrue(protocol.connection_lost.called)
+        self.assertEqual(protocol.connection_lost.call_args[0], (None,))
+
+        self.assertFalse(transport._closed)
+        self.assertIsNone(transport._loop)
+        self.assertIsNone(transport._proc)
+        self.assertIsNone(transport._protocol)
+
+        # methods must raise ProcessLookupError if the process exited
+        self.assertRaises(ProcessLookupError,
+                          transport.send_signal, signal.SIGTERM)
+        self.assertRaises(ProcessLookupError, transport.terminate)
+        self.assertRaises(ProcessLookupError, transport.kill)
+
+
 class SubprocessMixin:
 
     def test_stdin_stdout(self):

-- 
Repository URL: https://hg.python.org/cpython
More information about the Python-checkins mailing list

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4