1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
25
from ..lazy_import import lazy_import
26
lazy_import(globals(), """
47
class _ChooseUI(object):
49
""" Helper class for choose implementation.
52
def __init__(self, ui, msg, choices, default):
55
self._build_alternatives(msg, choices, default)
57
def _setup_mode(self):
58
"""Setup input mode (line-based, char-based) and echo-back.
60
Line-based input is used if the BRZ_TEXTUI_INPUT environment
61
variable is set to 'line-based', or if there is no controlling
64
is_tty = self.ui.raw_stdin.isatty()
65
if (os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and
66
self.ui.raw_stdin == _unwrap_stream(sys.stdin) and is_tty):
67
self.line_based = False
70
self.line_based = True
71
self.echo_back = not is_tty
73
def _build_alternatives(self, msg, choices, default):
74
"""Parse choices string.
76
Setup final prompt and the lists of choices and associated
81
self.alternatives = {}
82
choices = choices.split('\n')
83
if default is not None and default not in range(0, len(choices)):
84
raise ValueError("invalid default index")
86
name = c.replace('&', '').lower()
87
choice = (name, index)
88
if name in self.alternatives:
89
raise ValueError("duplicated choice: %s" % name)
90
self.alternatives[name] = choice
91
shortcut = c.find('&')
92
if -1 != shortcut and (shortcut + 1) < len(c):
94
help += '[' + c[shortcut + 1] + ']'
95
help += c[(shortcut + 2):]
96
shortcut = c[shortcut + 1]
98
c = c.replace('&', '')
100
help = '[%s]%s' % (shortcut, c[1:])
101
shortcut = shortcut.lower()
102
if shortcut in self.alternatives:
103
raise ValueError("duplicated shortcut: %s" % shortcut)
104
self.alternatives[shortcut] = choice
105
# Add redirections for default.
107
self.alternatives[''] = choice
108
self.alternatives['\r'] = choice
109
help_list.append(help)
112
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
115
line = self.ui.stdin.readline()
121
char = osutils.getchar()
122
if char == chr(3): # INTR
123
raise KeyboardInterrupt
124
if char == chr(4): # EOF (^d, C-d)
126
if isinstance(char, bytes):
127
return char.decode('ascii', 'replace')
131
"""Keep asking the user until a valid choice is made.
134
getchoice = self._getline
136
getchoice = self._getchar
140
if 1 == iter or self.line_based:
141
self.ui.prompt(self.prompt)
145
self.ui.stderr.write(u'\n')
147
except KeyboardInterrupt:
148
self.ui.stderr.write(u'\n')
150
choice = choice.lower()
151
if choice not in self.alternatives:
152
# Not a valid choice, keep on asking.
154
name, index = self.alternatives[choice]
156
self.ui.stderr.write(name + u'\n')
160
opt_progress_bar = config.Option(
161
'progress_bar', help='Progress bar type.',
162
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
166
class TextUIFactory(UIFactory):
167
"""A UI factory for Text user interfaces."""
169
def __init__(self, stdin, stdout, stderr):
170
"""Create a TextUIFactory."""
171
super(TextUIFactory, self).__init__()
175
self._progress_view = NullProgressView()
178
# Choose default encoding and handle py2/3 differences
179
self._setup_streams()
180
# paints progress, network activity, etc
181
self._progress_view = self.make_progress_view()
184
def _setup_streams(self):
185
self.raw_stdin = _unwrap_stream(self.stdin)
186
self.stdin = _wrap_in_stream(self.raw_stdin)
187
self.raw_stdout = _unwrap_stream(self.stdout)
188
self.stdout = _wrap_out_stream(self.raw_stdout)
189
self.raw_stderr = _unwrap_stream(self.stderr)
190
self.stderr = _wrap_out_stream(self.raw_stderr)
192
def choose(self, msg, choices, default=None):
193
"""Prompt the user for a list of alternatives.
195
Support both line-based and char-based editing.
197
In line-based mode, both the shortcut and full choice name are valid
198
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
199
'YES ' are all valid input lines for choosing 'yes'.
201
An empty line, when in line-based mode, or pressing enter in char-based
202
mode will select the default choice (if any).
204
Choice is echoed back if:
205
- input is char-based; which means a controlling terminal is available,
206
and osutils.getchar is used
207
- input is line-based, and no controlling terminal is available
210
choose_ui = _ChooseUI(self, msg, choices, default)
211
return choose_ui.interact()
213
def be_quiet(self, state):
214
if state and not self._quiet:
216
UIFactory.be_quiet(self, state)
217
self._progress_view = self.make_progress_view()
219
def clear_term(self):
220
"""Prepare the terminal for output.
222
This will, clear any progress bars, and leave the cursor at the
223
leftmost position."""
224
# XXX: If this is preparing to write to stdout, but that's for example
225
# directed into a file rather than to the terminal, and the progress
226
# bar _is_ going to the terminal, we shouldn't need
227
# to clear it. We might need to separately check for the case of
228
self._progress_view.clear()
230
def get_integer(self, prompt):
233
line = self.stdin.readline()
239
def get_non_echoed_password(self):
240
isatty = getattr(self.stdin, 'isatty', None)
241
if isatty is not None and isatty():
242
# getpass() ensure the password is not echoed and other
243
# cross-platform niceties
244
password = getpass.getpass('')
246
# echo doesn't make sense without a terminal
247
password = self.stdin.readline()
251
if password[-1] == '\n':
252
password = password[:-1]
255
def get_password(self, prompt=u'', **kwargs):
256
"""Prompt the user for a password.
258
:param prompt: The prompt to present the user
259
:param kwargs: Arguments which will be expanded into the prompt.
260
This lets front ends display different things if
262
:return: The password string, return None if the user
263
canceled the request.
266
self.prompt(prompt, **kwargs)
267
# There's currently no way to say 'i decline to enter a password'
268
# as opposed to 'my password is empty' -- does it matter?
269
return self.get_non_echoed_password()
271
def get_username(self, prompt, **kwargs):
272
"""Prompt the user for a username.
274
:param prompt: The prompt to present the user
275
:param kwargs: Arguments which will be expanded into the prompt.
276
This lets front ends display different things if
278
:return: The username string, return None if the user
279
canceled the request.
282
self.prompt(prompt, **kwargs)
283
username = self.stdin.readline()
287
if username[-1] == '\n':
288
username = username[:-1]
291
def make_progress_view(self):
292
"""Construct and return a new ProgressView subclass for this UI.
294
# with --quiet, never any progress view
295
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
296
# user specifically requests either text or no progress bars, always
297
# do that. otherwise, guess based on $TERM and tty presence.
299
return NullProgressView()
300
pb_type = config.GlobalStack().get('progress_bar')
301
if pb_type == 'none': # Explicit requirement
302
return NullProgressView()
303
if (pb_type == 'text' or # Explicit requirement
304
progress._supports_progress(self.stderr)): # Guess
305
return TextProgressView(self.stderr)
306
# No explicit requirement and no successful guess
307
return NullProgressView()
309
def _make_output_stream_explicit(self, encoding, encoding_type):
310
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
313
"""Write an already-formatted message, clearing the progress bar if necessary."""
315
self.stdout.write(msg + '\n')
317
def prompt(self, prompt, **kwargs):
318
"""Emit prompt on the CLI.
320
:param kwargs: Dictionary of arguments to insert into the prompt,
321
to allow UIs to reformat the prompt.
323
if not isinstance(prompt, str):
324
raise ValueError("prompt %r not a unicode string" % prompt)
326
# See <https://launchpad.net/bugs/365891>
327
prompt = prompt % kwargs
330
self.stderr.write(prompt)
333
def report_transport_activity(self, transport, byte_count, direction):
334
"""Called by transports as they do IO.
336
This may update a progress bar, spinner, or similar display.
337
By default it does nothing.
339
self._progress_view.show_transport_activity(transport,
340
direction, byte_count)
342
def log_transport_activity(self, display=False):
343
"""See UIFactory.log_transport_activity()"""
344
log = getattr(self._progress_view, 'log_transport_activity', None)
348
def show_error(self, msg):
350
self.stderr.write("bzr: error: %s\n" % msg)
352
def show_message(self, msg):
355
def show_warning(self, msg):
357
self.stderr.write("bzr: warning: %s\n" % msg)
359
def _progress_updated(self, task):
360
"""A task has been updated and wants to be displayed.
362
if not self._task_stack:
363
warnings.warn("%r updated but no tasks are active" %
365
elif task != self._task_stack[-1]:
366
# We used to check it was the top task, but it's hard to always
367
# get this right and it's not necessarily useful: any actual
368
# problems will be evident in use
369
# warnings.warn("%r is not the top progress task %r" %
370
# (task, self._task_stack[-1]))
372
self._progress_view.show_progress(task)
374
def _progress_all_finished(self):
375
self._progress_view.clear()
377
def show_user_warning(self, warning_id, **message_args):
378
"""Show a text message to the user.
380
Explicitly not for warnings about bzr apis, deprecations or internals.
382
# eventually trace.warning should migrate here, to avoid logging and
383
# be easier to test; that has a lot of test fallout so for now just
384
# new code can call this
385
if warning_id not in self.suppressed_warnings:
386
warning = self.format_user_warning(warning_id, message_args)
387
self.stderr.write(warning + '\n')
390
def pad_to_width(line, width, encoding_hint='ascii'):
391
"""Truncate or pad unicode line to width.
393
This is best-effort for now, and strings containing control codes or
394
non-ascii text may be cut and padded incorrectly.
396
s = line.encode(encoding_hint, 'replace')
397
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
400
class TextProgressView(object):
401
"""Display of progress bar and other information on a tty.
403
This shows one line of text, including possibly a network indicator,
404
spinner, progress bar, message, etc.
406
One instance of this is created and held by the UI, and fed updates when a
407
task wants to be painted.
409
Transports feed data to this through the ui_factory object.
411
The Progress views can comprise a tree with _parent_task pointers, but
412
this only prints the stack from the nominated current task up to the root.
415
def __init__(self, term_file, encoding=None, errors=None):
416
self._term_file = term_file
418
self._encoding = getattr(term_file, "encoding", None) or "ascii"
420
self._encoding = encoding
421
# true when there's output on the screen we may need to clear
422
self._have_output = False
423
self._last_transport_msg = ''
425
# time we last repainted the screen
426
self._last_repaint = 0
427
# time we last got information about transport activity
428
self._transport_update_time = 0
429
self._last_task = None
430
self._total_byte_count = 0
431
self._bytes_since_update = 0
432
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
433
self._first_byte_time = None
435
# force the progress bar to be off, as at the moment it doesn't
436
# correspond reliably to overall command progress
437
self.enable_bar = False
439
def _avail_width(self):
440
# we need one extra space for terminals that wrap on last char
441
w = osutils.terminal_width()
447
def _show_line(self, u):
448
width = self._avail_width()
449
if width is not None:
450
u = pad_to_width(u, width, encoding_hint=self._encoding)
451
self._term_file.write('\r' + u + '\r')
454
if self._have_output:
456
self._have_output = False
458
def _render_bar(self):
459
# return a string for the progress bar itself
460
if self.enable_bar and (
461
(self._last_task is None) or self._last_task.show_bar):
462
# If there's no task object, we show space for the bar anyhow.
463
# That's because most invocations of bzr will end showing progress
464
# at some point, though perhaps only after doing some initial IO.
465
# It looks better to draw the progress bar initially rather than
466
# to have what looks like an incomplete progress bar.
467
spin_str = r'/-\|'[self._spin_pos % 4]
470
if self._last_task is None:
471
completion_fraction = 0
474
completion_fraction = \
475
self._last_task._overall_completion_fraction() or 0
476
if (completion_fraction < self._fraction and 'progress' in
479
self._fraction = completion_fraction
480
markers = int(round(float(cols) * completion_fraction)) - 1
481
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
483
elif (self._last_task is None) or self._last_task.show_spinner:
484
# The last task wanted just a spinner, no bar
485
spin_str = r'/-\|'[self._spin_pos % 4]
487
return spin_str + ' '
491
def _format_task(self, task):
492
"""Format task-specific parts of progress bar.
494
:returns: (text_part, counter_part) both unicode strings.
496
if not task.show_count:
498
elif task.current_cnt is not None and task.total_cnt is not None:
499
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
500
elif task.current_cnt is not None:
501
s = ' %d' % (task.current_cnt)
504
# compose all the parent messages
507
while t._parent_task:
513
def _render_line(self):
514
bar_string = self._render_bar()
516
task_part, counter_part = self._format_task(self._last_task)
518
task_part = counter_part = ''
519
if self._last_task and not self._last_task.show_transport_activity:
522
trans = self._last_transport_msg
523
# the bar separates the transport activity from the message, so even
524
# if there's no bar or spinner, we must show something if both those
526
if (task_part or trans) and not bar_string:
528
# preferentially truncate the task message if we don't have enough
530
avail_width = self._avail_width()
531
if avail_width is not None:
532
# if terminal avail_width is unknown, don't truncate
533
current_len = len(bar_string) + len(trans) + \
534
len(task_part) + len(counter_part)
535
# GZ 2017-04-22: Should measure and truncate task_part properly
536
gap = current_len - avail_width
538
task_part = task_part[:-gap - 2] + '..'
539
s = trans + bar_string + task_part + counter_part
540
if avail_width is not None:
541
if len(s) < avail_width:
542
s = s.ljust(avail_width)
543
elif len(s) > avail_width:
548
s = self._render_line()
550
self._have_output = True
552
def show_progress(self, task):
553
"""Called by the task object when it has changed.
555
:param task: The top task object; its parents are also included
558
must_update = task is not self._last_task
559
self._last_task = task
561
if (not must_update) and (now < self._last_repaint + task.update_latency):
563
if now > self._transport_update_time + 10:
564
# no recent activity; expire it
565
self._last_transport_msg = ''
566
self._last_repaint = now
569
def show_transport_activity(self, transport, direction, byte_count):
570
"""Called by transports via the ui_factory, as they do IO.
572
This may update a progress bar, spinner, or similar display.
573
By default it does nothing.
575
# XXX: there should be a transport activity model, and that too should
576
# be seen by the progress view, rather than being poked in here.
577
self._total_byte_count += byte_count
578
self._bytes_since_update += byte_count
579
if self._first_byte_time is None:
580
# Note that this isn't great, as technically it should be the time
581
# when the bytes started transferring, not when they completed.
582
# However, we usually start with a small request anyway.
583
self._first_byte_time = time.time()
584
if direction in self._bytes_by_direction:
585
self._bytes_by_direction[direction] += byte_count
587
self._bytes_by_direction['unknown'] += byte_count
588
if 'no_activity' in debug.debug_flags:
589
# Can be used as a workaround if
590
# <https://launchpad.net/bugs/321935> reappears and transport
591
# activity is cluttering other output. However, thanks to
592
# TextUIOutputStream this shouldn't be a problem any more.
595
if self._total_byte_count < 2000:
596
# a little resistance at first, so it doesn't stay stuck at 0
597
# while connecting...
599
if self._transport_update_time is None:
600
self._transport_update_time = now
601
elif now >= (self._transport_update_time + 0.5):
602
# guard against clock stepping backwards, and don't update too
604
rate = (self._bytes_since_update /
605
(now - self._transport_update_time))
606
# using base-10 units (see HACKING.txt).
607
msg = ("%6dkB %5dkB/s " %
608
(self._total_byte_count / 1000, int(rate) / 1000,))
609
self._transport_update_time = now
610
self._last_repaint = now
611
self._bytes_since_update = 0
612
self._last_transport_msg = msg
615
def _format_bytes_by_direction(self):
616
if self._first_byte_time is None:
619
transfer_time = time.time() - self._first_byte_time
620
if transfer_time < 0.001:
621
transfer_time = 0.001
622
bps = self._total_byte_count / transfer_time
624
# using base-10 units (see HACKING.txt).
625
msg = ('Transferred: %.0fkB'
626
' (%.1fkB/s r:%.0fkB w:%.0fkB'
627
% (self._total_byte_count / 1000.,
629
self._bytes_by_direction['read'] / 1000.,
630
self._bytes_by_direction['write'] / 1000.,
632
if self._bytes_by_direction['unknown'] > 0:
633
msg += ' u:%.0fkB)' % (
634
self._bytes_by_direction['unknown'] / 1000.
640
def log_transport_activity(self, display=False):
641
msg = self._format_bytes_by_direction()
643
if display and self._total_byte_count > 0:
645
self._term_file.write(msg + '\n')
648
def _get_stream_encoding(stream):
649
encoding = config.GlobalStack().get('output_encoding')
651
encoding = getattr(stream, "encoding", None)
653
encoding = osutils.get_terminal_encoding(trace=True)
657
def _unwrap_stream(stream):
658
inner = getattr(stream, "buffer", None)
660
inner = getattr(stream, "stream", stream)
664
def _wrap_in_stream(stream, encoding=None, errors='replace'):
666
encoding = _get_stream_encoding(stream)
667
# Attempt to wrap using io.open if possible, since that can do
670
fileno = stream.fileno()
671
except io.UnsupportedOperation:
672
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
673
encoded_stream.encoding = encoding
674
return encoded_stream
676
return io.open(fileno, encoding=encoding, errors=errors, mode='r', buffering=1)
679
def _wrap_out_stream(stream, encoding=None, errors='replace'):
681
encoding = _get_stream_encoding(stream)
682
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
683
encoded_stream.encoding = encoding
684
return encoded_stream
687
class TextUIOutputStream(object):
688
"""Decorates stream to interact better with progress and change encoding.
690
Before writing to the wrapped stream, progress is cleared. Callers must
691
ensure bulk output is terminated with a newline so progress won't overwrite
694
Additionally, the encoding and errors behaviour of the underlying stream
695
can be changed at this point. If errors is set to 'exact' raw bytes may be
696
written to the underlying stream.
699
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
700
self.ui_factory = ui_factory
701
# GZ 2017-05-21: Clean up semantics when callers are made saner.
702
inner = _unwrap_stream(stream)
703
self.raw_stream = None
704
if errors == "exact":
706
self.raw_stream = inner
708
self.wrapped_stream = stream
710
encoding = _get_stream_encoding(stream)
712
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
714
encoding = self.wrapped_stream.encoding
715
self.encoding = encoding
718
def _write(self, to_write):
719
if isinstance(to_write, bytes):
721
to_write = to_write.decode(self.encoding, self.errors)
722
except UnicodeDecodeError:
723
self.raw_stream.write(to_write)
725
self.wrapped_stream.write(to_write)
728
self.ui_factory.clear_term()
729
self.wrapped_stream.flush()
731
def write(self, to_write):
732
self.ui_factory.clear_term()
733
self._write(to_write)
735
def writelines(self, lines):
736
self.ui_factory.clear_term()