1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
26
from ..lazy_import import lazy_import
27
lazy_import(globals(), """
42
from ..sixish import (
51
class _ChooseUI(object):
53
""" Helper class for choose implementation.
56
def __init__(self, ui, msg, choices, default):
59
self._build_alternatives(msg, choices, default)
61
def _setup_mode(self):
62
"""Setup input mode (line-based, char-based) and echo-back.
64
Line-based input is used if the BRZ_TEXTUI_INPUT environment
65
variable is set to 'line-based', or if there is no controlling
68
is_tty = self.ui.raw_stdin.isatty()
69
if (os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and
70
self.ui.raw_stdin == sys.stdin and is_tty):
71
self.line_based = False
74
self.line_based = True
75
self.echo_back = not is_tty
77
def _build_alternatives(self, msg, choices, default):
78
"""Parse choices string.
80
Setup final prompt and the lists of choices and associated
85
self.alternatives = {}
86
choices = choices.split('\n')
87
if default is not None and default not in range(0, len(choices)):
88
raise ValueError("invalid default index")
90
name = c.replace('&', '').lower()
91
choice = (name, index)
92
if name in self.alternatives:
93
raise ValueError("duplicated choice: %s" % name)
94
self.alternatives[name] = choice
95
shortcut = c.find('&')
96
if -1 != shortcut and (shortcut + 1) < len(c):
98
help += '[' + c[shortcut + 1] + ']'
99
help += c[(shortcut + 2):]
100
shortcut = c[shortcut + 1]
102
c = c.replace('&', '')
104
help = '[%s]%s' % (shortcut, c[1:])
105
shortcut = shortcut.lower()
106
if shortcut in self.alternatives:
107
raise ValueError("duplicated shortcut: %s" % shortcut)
108
self.alternatives[shortcut] = choice
109
# Add redirections for default.
111
self.alternatives[''] = choice
112
self.alternatives['\r'] = choice
113
help_list.append(help)
116
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
119
line = self.ui.stdin.readline()
125
char = osutils.getchar()
126
if char == chr(3): # INTR
127
raise KeyboardInterrupt
128
if char == chr(4): # EOF (^d, C-d)
130
if isinstance(char, bytes):
131
return char.decode('ascii', 'replace')
135
"""Keep asking the user until a valid choice is made.
138
getchoice = self._getline
140
getchoice = self._getchar
144
if 1 == iter or self.line_based:
145
self.ui.prompt(self.prompt)
149
self.ui.stderr.write(u'\n')
151
except KeyboardInterrupt:
152
self.ui.stderr.write(u'\n')
154
choice = choice.lower()
155
if choice not in self.alternatives:
156
# Not a valid choice, keep on asking.
158
name, index = self.alternatives[choice]
160
self.ui.stderr.write(name + u'\n')
164
opt_progress_bar = config.Option(
165
'progress_bar', help='Progress bar type.',
166
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
170
class TextUIFactory(UIFactory):
171
"""A UI factory for Text user interfaces."""
173
def __init__(self, stdin, stdout, stderr):
174
"""Create a TextUIFactory."""
175
super(TextUIFactory, self).__init__()
179
self._progress_view = NullProgressView()
182
# Choose default encoding and handle py2/3 differences
183
self._setup_streams()
184
# paints progress, network activity, etc
185
self._progress_view = self.make_progress_view()
188
def _setup_streams(self):
189
self.raw_stdin = _unwrap_stream(self.stdin)
190
self.stdin = _wrap_in_stream(self.raw_stdin)
191
self.raw_stdout = _unwrap_stream(self.stdout)
192
self.stdout = _wrap_out_stream(self.raw_stdout)
193
self.raw_stderr = _unwrap_stream(self.stderr)
194
self.stderr = _wrap_out_stream(self.raw_stderr)
196
def choose(self, msg, choices, default=None):
197
"""Prompt the user for a list of alternatives.
199
Support both line-based and char-based editing.
201
In line-based mode, both the shortcut and full choice name are valid
202
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
203
'YES ' are all valid input lines for choosing 'yes'.
205
An empty line, when in line-based mode, or pressing enter in char-based
206
mode will select the default choice (if any).
208
Choice is echoed back if:
209
- input is char-based; which means a controlling terminal is available,
210
and osutils.getchar is used
211
- input is line-based, and no controlling terminal is available
214
choose_ui = _ChooseUI(self, msg, choices, default)
215
return choose_ui.interact()
217
def be_quiet(self, state):
218
if state and not self._quiet:
220
UIFactory.be_quiet(self, state)
221
self._progress_view = self.make_progress_view()
223
def clear_term(self):
224
"""Prepare the terminal for output.
226
This will, clear any progress bars, and leave the cursor at the
227
leftmost position."""
228
# XXX: If this is preparing to write to stdout, but that's for example
229
# directed into a file rather than to the terminal, and the progress
230
# bar _is_ going to the terminal, we shouldn't need
231
# to clear it. We might need to separately check for the case of
232
self._progress_view.clear()
234
def get_integer(self, prompt):
237
line = self.stdin.readline()
243
def get_non_echoed_password(self):
244
isatty = getattr(self.stdin, 'isatty', None)
245
if isatty is not None and isatty():
246
# getpass() ensure the password is not echoed and other
247
# cross-platform niceties
248
password = getpass.getpass('')
250
# echo doesn't make sense without a terminal
251
password = self.stdin.readline()
255
if password[-1] == '\n':
256
password = password[:-1]
259
def get_password(self, prompt=u'', **kwargs):
260
"""Prompt the user for a password.
262
:param prompt: The prompt to present the user
263
:param kwargs: Arguments which will be expanded into the prompt.
264
This lets front ends display different things if
266
:return: The password string, return None if the user
267
canceled the request.
270
self.prompt(prompt, **kwargs)
271
# There's currently no way to say 'i decline to enter a password'
272
# as opposed to 'my password is empty' -- does it matter?
273
return self.get_non_echoed_password()
275
def get_username(self, prompt, **kwargs):
276
"""Prompt the user for a username.
278
:param prompt: The prompt to present the user
279
:param kwargs: Arguments which will be expanded into the prompt.
280
This lets front ends display different things if
282
:return: The username string, return None if the user
283
canceled the request.
286
self.prompt(prompt, **kwargs)
287
username = self.stdin.readline()
291
if username[-1] == '\n':
292
username = username[:-1]
295
def make_progress_view(self):
296
"""Construct and return a new ProgressView subclass for this UI.
298
# with --quiet, never any progress view
299
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
300
# user specifically requests either text or no progress bars, always
301
# do that. otherwise, guess based on $TERM and tty presence.
303
return NullProgressView()
304
pb_type = config.GlobalStack().get('progress_bar')
305
if pb_type == 'none': # Explicit requirement
306
return NullProgressView()
307
if (pb_type == 'text' # Explicit requirement
308
or progress._supports_progress(self.stderr)): # Guess
309
return TextProgressView(self.stderr)
310
# No explicit requirement and no successful guess
311
return NullProgressView()
313
def _make_output_stream_explicit(self, encoding, encoding_type):
314
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
317
"""Write an already-formatted message, clearing the progress bar if necessary."""
319
self.stdout.write(msg + '\n')
321
def prompt(self, prompt, **kwargs):
322
"""Emit prompt on the CLI.
324
:param kwargs: Dictionary of arguments to insert into the prompt,
325
to allow UIs to reformat the prompt.
327
if not isinstance(prompt, text_type):
328
raise ValueError("prompt %r not a unicode string" % prompt)
330
# See <https://launchpad.net/bugs/365891>
331
prompt = prompt % kwargs
334
self.stderr.write(prompt)
336
def report_transport_activity(self, transport, byte_count, direction):
337
"""Called by transports as they do IO.
339
This may update a progress bar, spinner, or similar display.
340
By default it does nothing.
342
self._progress_view.show_transport_activity(transport,
343
direction, byte_count)
345
def log_transport_activity(self, display=False):
346
"""See UIFactory.log_transport_activity()"""
347
log = getattr(self._progress_view, 'log_transport_activity', None)
351
def show_error(self, msg):
353
self.stderr.write("bzr: error: %s\n" % msg)
355
def show_message(self, msg):
358
def show_warning(self, msg):
360
self.stderr.write("bzr: warning: %s\n" % msg)
362
def _progress_updated(self, task):
363
"""A task has been updated and wants to be displayed.
365
if not self._task_stack:
366
warnings.warn("%r updated but no tasks are active" %
368
elif task != self._task_stack[-1]:
369
# We used to check it was the top task, but it's hard to always
370
# get this right and it's not necessarily useful: any actual
371
# problems will be evident in use
372
#warnings.warn("%r is not the top progress task %r" %
373
# (task, self._task_stack[-1]))
375
self._progress_view.show_progress(task)
377
def _progress_all_finished(self):
378
self._progress_view.clear()
380
def show_user_warning(self, warning_id, **message_args):
381
"""Show a text message to the user.
383
Explicitly not for warnings about bzr apis, deprecations or internals.
385
# eventually trace.warning should migrate here, to avoid logging and
386
# be easier to test; that has a lot of test fallout so for now just
387
# new code can call this
388
if warning_id not in self.suppressed_warnings:
389
warning = self.format_user_warning(warning_id, message_args)
390
self.stderr.write(warning + '\n')
393
def pad_to_width(line, width, encoding_hint='ascii'):
394
"""Truncate or pad unicode line to width.
396
This is best-effort for now, and strings containing control codes or
397
non-ascii text may be cut and padded incorrectly.
399
s = line.encode(encoding_hint, 'replace')
400
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
403
class TextProgressView(object):
404
"""Display of progress bar and other information on a tty.
406
This shows one line of text, including possibly a network indicator,
407
spinner, progress bar, message, etc.
409
One instance of this is created and held by the UI, and fed updates when a
410
task wants to be painted.
412
Transports feed data to this through the ui_factory object.
414
The Progress views can comprise a tree with _parent_task pointers, but
415
this only prints the stack from the nominated current task up to the root.
418
def __init__(self, term_file, encoding=None, errors=None):
419
self._term_file = term_file
421
self._encoding = getattr(term_file, "encoding", None) or "ascii"
423
self._encoding = encoding
424
# true when there's output on the screen we may need to clear
425
self._have_output = False
426
self._last_transport_msg = ''
428
# time we last repainted the screen
429
self._last_repaint = 0
430
# time we last got information about transport activity
431
self._transport_update_time = 0
432
self._last_task = None
433
self._total_byte_count = 0
434
self._bytes_since_update = 0
435
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
436
self._first_byte_time = None
438
# force the progress bar to be off, as at the moment it doesn't
439
# correspond reliably to overall command progress
440
self.enable_bar = False
442
def _avail_width(self):
443
# we need one extra space for terminals that wrap on last char
444
w = osutils.terminal_width()
450
def _show_line(self, u):
451
width = self._avail_width()
452
if width is not None:
453
u = pad_to_width(u, width, encoding_hint=self._encoding)
454
self._term_file.write('\r' + u + '\r')
457
if self._have_output:
459
self._have_output = False
461
def _render_bar(self):
462
# return a string for the progress bar itself
463
if self.enable_bar and (
464
(self._last_task is None) or self._last_task.show_bar):
465
# If there's no task object, we show space for the bar anyhow.
466
# That's because most invocations of bzr will end showing progress
467
# at some point, though perhaps only after doing some initial IO.
468
# It looks better to draw the progress bar initially rather than
469
# to have what looks like an incomplete progress bar.
470
spin_str = r'/-\|'[self._spin_pos % 4]
473
if self._last_task is None:
474
completion_fraction = 0
477
completion_fraction = \
478
self._last_task._overall_completion_fraction() or 0
479
if (completion_fraction < self._fraction and 'progress' in
482
self._fraction = completion_fraction
483
markers = int(round(float(cols) * completion_fraction)) - 1
484
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
486
elif (self._last_task is None) or self._last_task.show_spinner:
487
# The last task wanted just a spinner, no bar
488
spin_str = r'/-\|'[self._spin_pos % 4]
490
return spin_str + ' '
494
def _format_task(self, task):
495
"""Format task-specific parts of progress bar.
497
:returns: (text_part, counter_part) both unicode strings.
499
if not task.show_count:
501
elif task.current_cnt is not None and task.total_cnt is not None:
502
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
503
elif task.current_cnt is not None:
504
s = ' %d' % (task.current_cnt)
507
# compose all the parent messages
510
while t._parent_task:
516
def _render_line(self):
517
bar_string = self._render_bar()
519
task_part, counter_part = self._format_task(self._last_task)
521
task_part = counter_part = ''
522
if self._last_task and not self._last_task.show_transport_activity:
525
trans = self._last_transport_msg
526
# the bar separates the transport activity from the message, so even
527
# if there's no bar or spinner, we must show something if both those
529
if (task_part or trans) and not bar_string:
531
# preferentially truncate the task message if we don't have enough
533
avail_width = self._avail_width()
534
if avail_width is not None:
535
# if terminal avail_width is unknown, don't truncate
536
current_len = len(bar_string) + len(trans) + len(task_part) + len(counter_part)
537
# GZ 2017-04-22: Should measure and truncate task_part properly
538
gap = current_len - avail_width
540
task_part = task_part[:-gap-2] + '..'
541
s = trans + bar_string + task_part + counter_part
542
if avail_width is not None:
543
if len(s) < avail_width:
544
s = s.ljust(avail_width)
545
elif len(s) > avail_width:
550
s = self._render_line()
552
self._have_output = True
554
def show_progress(self, task):
555
"""Called by the task object when it has changed.
557
:param task: The top task object; its parents are also included
560
must_update = task is not self._last_task
561
self._last_task = task
563
if (not must_update) and (now < self._last_repaint + task.update_latency):
565
if now > self._transport_update_time + 10:
566
# no recent activity; expire it
567
self._last_transport_msg = ''
568
self._last_repaint = now
571
def show_transport_activity(self, transport, direction, byte_count):
572
"""Called by transports via the ui_factory, as they do IO.
574
This may update a progress bar, spinner, or similar display.
575
By default it does nothing.
577
# XXX: there should be a transport activity model, and that too should
578
# be seen by the progress view, rather than being poked in here.
579
self._total_byte_count += byte_count
580
self._bytes_since_update += byte_count
581
if self._first_byte_time is None:
582
# Note that this isn't great, as technically it should be the time
583
# when the bytes started transferring, not when they completed.
584
# However, we usually start with a small request anyway.
585
self._first_byte_time = time.time()
586
if direction in self._bytes_by_direction:
587
self._bytes_by_direction[direction] += byte_count
589
self._bytes_by_direction['unknown'] += byte_count
590
if 'no_activity' in debug.debug_flags:
591
# Can be used as a workaround if
592
# <https://launchpad.net/bugs/321935> reappears and transport
593
# activity is cluttering other output. However, thanks to
594
# TextUIOutputStream this shouldn't be a problem any more.
597
if self._total_byte_count < 2000:
598
# a little resistance at first, so it doesn't stay stuck at 0
599
# while connecting...
601
if self._transport_update_time is None:
602
self._transport_update_time = now
603
elif now >= (self._transport_update_time + 0.5):
604
# guard against clock stepping backwards, and don't update too
606
rate = (self._bytes_since_update
607
/ (now - self._transport_update_time))
608
# using base-10 units (see HACKING.txt).
609
msg = ("%6dkB %5dkB/s " %
610
(self._total_byte_count / 1000, int(rate) / 1000,))
611
self._transport_update_time = now
612
self._last_repaint = now
613
self._bytes_since_update = 0
614
self._last_transport_msg = msg
617
def _format_bytes_by_direction(self):
618
if self._first_byte_time is None:
621
transfer_time = time.time() - self._first_byte_time
622
if transfer_time < 0.001:
623
transfer_time = 0.001
624
bps = self._total_byte_count / transfer_time
626
# using base-10 units (see HACKING.txt).
627
msg = ('Transferred: %.0fkB'
628
' (%.1fkB/s r:%.0fkB w:%.0fkB'
629
% (self._total_byte_count / 1000.,
631
self._bytes_by_direction['read'] / 1000.,
632
self._bytes_by_direction['write'] / 1000.,
634
if self._bytes_by_direction['unknown'] > 0:
635
msg += ' u:%.0fkB)' % (
636
self._bytes_by_direction['unknown'] / 1000.
642
def log_transport_activity(self, display=False):
643
msg = self._format_bytes_by_direction()
645
if display and self._total_byte_count > 0:
647
self._term_file.write(msg + '\n')
650
def _get_stream_encoding(stream):
651
encoding = config.GlobalStack().get('output_encoding')
653
encoding = getattr(stream, "encoding", None)
655
encoding = osutils.get_terminal_encoding(trace=True)
659
def _unwrap_stream(stream):
660
inner = getattr(stream, "buffer", None)
662
inner = getattr(stream, "stream", stream)
666
def _wrap_in_stream(stream, encoding=None, errors='replace'):
668
encoding = _get_stream_encoding(stream)
669
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
670
encoded_stream.encoding = encoding
671
return encoded_stream
674
def _wrap_out_stream(stream, encoding=None, errors='replace'):
676
encoding = _get_stream_encoding(stream)
677
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
678
encoded_stream.encoding = encoding
679
return encoded_stream
682
class TextUIOutputStream(object):
683
"""Decorates stream to interact better with progress and change encoding.
685
Before writing to the wrapped stream, progress is cleared. Callers must
686
ensure bulk output is terminated with a newline so progress won't overwrite
689
Additionally, the encoding and errors behaviour of the underlying stream
690
can be changed at this point. If errors is set to 'exact' raw bytes may be
691
written to the underlying stream.
694
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
695
self.ui_factory = ui_factory
696
# GZ 2017-05-21: Clean up semantics when callers are made saner.
697
inner = _unwrap_stream(stream)
698
self.raw_stream = None
699
if errors == "exact":
701
self.raw_stream = inner
703
self.wrapped_stream = stream
705
encoding = _get_stream_encoding(stream)
707
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
709
encoding = self.wrapped_stream.encoding
710
self.encoding = encoding
713
def _write(self, to_write):
714
if isinstance(to_write, bytes):
716
to_write = to_write.decode(self.encoding, self.errors)
717
except UnicodeDecodeError:
718
self.raw_stream.write(to_write)
720
self.wrapped_stream.write(to_write)
723
self.ui_factory.clear_term()
724
self.wrapped_stream.flush()
726
def write(self, to_write):
727
self.ui_factory.clear_term()
728
self._write(to_write)
730
def writelines(self, lines):
731
self.ui_factory.clear_term()