5
5
from subprocess import check_output
6
6
import os.path
7
7
import uuid
8
+
import random
9
+
import string
8
10
9
11
import re
10
12
import signal
@@ -25,23 +27,39 @@ class IREPLWrapper(replwrap.REPLWrapper):
25
27
:param line_output_callback: a callback method to receive each batch
26
28
of incremental output. It takes one string parameter.
27
29
"""
28
-
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change,
30
+
def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, unique_prompt,
29
31
extra_init_cmd=None, line_output_callback=None):
32
+
self.unique_prompt = unique_prompt
30
33
self.line_output_callback = line_output_callback
34
+
# The extra regex at the start of PS1 below is designed to catch the
35
+
# `(envname) ` which conda/mamba add to the start of PS1 by default.
36
+
# Obviously anything else that looks like this, including user output,
37
+
# will be eaten.
38
+
# FIXME: work out if there is a way to update these by reading PS1
39
+
# after each command and checking that it has changed. The answer is
40
+
# probably no, as we never see individual commands but rather cells
41
+
# with possibly many commands, and would need to update this half-way
42
+
# through a cell.
43
+
self.ps1_re = r"(\(\w+\) )?" + re.escape(self.unique_prompt + ">")
44
+
self.ps2_re = re.escape(self.unique_prompt + "+")
31
45
replwrap.REPLWrapper.__init__(self, cmd_or_spawn, orig_prompt,
32
-
prompt_change, extra_init_cmd=extra_init_cmd)
46
+
prompt_change, new_prompt=self.ps1_re,
47
+
continuation_prompt=self.ps2_re, extra_init_cmd=extra_init_cmd)
33
48
34
49
def _expect_prompt(self, timeout=-1):
50
+
prompts = [self.ps1_re, self.ps2_re]
51
+
35
52
if timeout == None:
36
53
# "None" means we are executing code from a Jupyter cell by way of the run_command
37
-
# in the do_execute() code below, so do incremental output.
54
+
# in the do_execute() code below, so do incremental output, i.e.
55
+
# also look for end of line or carridge return
56
+
prompts.extend(['\r?\n', '\r'])
38
57
while True:
39
-
pos = self.child.expect_exact([self.prompt, self.continuation_prompt, u'\r\n', u'\n', u'\r'],
40
-
timeout=None)
41
-
if pos == 2 or pos == 3:
58
+
pos = self.child.expect_list([re.compile(x) for x in prompts], timeout=None)
59
+
if pos == 2:
42
60
# End of line received.
43
61
self.line_output_callback(self.child.before + '\n')
44
-
elif pos == 4:
62
+
elif pos == 3:
45
63
# Carriage return ('\r') received.
46
64
self.line_output_callback(self.child.before + '\r')
47
65
else:
@@ -50,8 +68,8 @@ def _expect_prompt(self, timeout=-1):
50
68
self.line_output_callback(self.child.before)
51
69
break
52
70
else:
53
-
# Otherwise, use existing non-incremental code
54
-
pos = replwrap.REPLWrapper._expect_prompt(self, timeout=timeout)
71
+
# Otherwise, wait (with timeout) until the next prompt
72
+
pos = self.child.expect_list([re.compile(x) for x in prompts], timeout=timeout)
55
73
56
74
# Prompt received, so return normally
57
75
return pos
@@ -79,6 +97,9 @@ def banner(self):
79
97
'file_extension': '.sh'}
80
98
81
99
def __init__(self, **kwargs):
100
+
# Make a random prompt, further reducing chances of accidental matches.
101
+
rand = ''.join(random.choices(string.ascii_uppercase, k=12))
102
+
self.unique_prompt = "PROMPT_" + rand
82
103
Kernel.__init__(self, **kwargs)
83
104
self._start_bash()
84
105
self._known_display_ids = set()
@@ -97,12 +118,16 @@ def _start_bash(self):
97
118
bashrc = os.path.join(os.path.dirname(pexpect.__file__), 'bashrc.sh')
98
119
child = pexpect.spawn("bash", ['--rcfile', bashrc], echo=False,
99
120
encoding='utf-8', codec_errors='replace')
100
-
ps1 = replwrap.PEXPECT_PROMPT[:5] + u'\[\]' + replwrap.PEXPECT_PROMPT[5:]
101
-
ps2 = replwrap.PEXPECT_CONTINUATION_PROMPT[:5] + u'\[\]' + replwrap.PEXPECT_CONTINUATION_PROMPT[5:]
121
+
# Following comment stolen from upstream's REPLWrap:
122
+
# If the user runs 'env', the value of PS1 will be in the output. To avoid
123
+
# replwrap seeing that as the next prompt, we'll embed the marker characters
124
+
# for invisible characters in the prompt; these show up when inspecting the
125
+
# environment variable, but not when bash displays the prompt.
126
+
ps1 = self.unique_prompt + u'\[\]' + ">"
127
+
ps2 = self.unique_prompt + u'\[\]' + "+"
102
128
prompt_change = u"PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
103
-
104
129
# Using IREPLWrapper to get incremental output
105
-
self.bashwrapper = IREPLWrapper(child, u'\$', prompt_change,
130
+
self.bashwrapper = IREPLWrapper(child, u'\$', prompt_change, self.unique_prompt,
106
131
extra_init_cmd="export PAGER=cat",
107
132
line_output_callback=self.process_output)
108
133
finally:
@@ -182,8 +207,8 @@ def do_execute(self, code, silent, store_history=True,
182
207
return {'status': 'abort', 'execution_count': self.execution_count}
183
208
184
209
try:
185
-
exitcode = int(self.bashwrapper.run_command('echo $?').rstrip())
186
-
except Exception:
210
+
exitcode = int(self.bashwrapper.run_command('echo $?').rstrip().split("\r\n")[0])
211
+
except Exception as exc:
187
212
exitcode = 1
188
213
189
214
if exitcode:
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4