1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
27
from ..lazy_import import lazy_import
28
lazy_import(globals(), """
43
from ..sixish import (
52
class _ChooseUI(object):
54
""" Helper class for choose implementation.
57
def __init__(self, ui, msg, choices, default):
60
self._build_alternatives(msg, choices, default)
62
def _setup_mode(self):
63
"""Setup input mode (line-based, char-based) and echo-back.
65
Line-based input is used if the BRZ_TEXTUI_INPUT environment
66
variable is set to 'line-based', or if there is no controlling
69
is_tty = self.ui.raw_stdin.isatty()
70
if (os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and
71
self.ui.raw_stdin == _unwrap_stream(sys.stdin) and is_tty):
72
self.line_based = False
75
self.line_based = True
76
self.echo_back = not is_tty
78
def _build_alternatives(self, msg, choices, default):
79
"""Parse choices string.
81
Setup final prompt and the lists of choices and associated
86
self.alternatives = {}
87
choices = choices.split('\n')
88
if default is not None and default not in range(0, len(choices)):
89
raise ValueError("invalid default index")
91
name = c.replace('&', '').lower()
92
choice = (name, index)
93
if name in self.alternatives:
94
raise ValueError("duplicated choice: %s" % name)
95
self.alternatives[name] = choice
96
shortcut = c.find('&')
97
if -1 != shortcut and (shortcut + 1) < len(c):
99
help += '[' + c[shortcut + 1] + ']'
100
help += c[(shortcut + 2):]
101
shortcut = c[shortcut + 1]
103
c = c.replace('&', '')
105
help = '[%s]%s' % (shortcut, c[1:])
106
shortcut = shortcut.lower()
107
if shortcut in self.alternatives:
108
raise ValueError("duplicated shortcut: %s" % shortcut)
109
self.alternatives[shortcut] = choice
110
# Add redirections for default.
112
self.alternatives[''] = choice
113
self.alternatives['\r'] = choice
114
help_list.append(help)
117
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
120
line = self.ui.stdin.readline()
126
char = osutils.getchar()
127
if char == chr(3): # INTR
128
raise KeyboardInterrupt
129
if char == chr(4): # EOF (^d, C-d)
131
if isinstance(char, bytes):
132
return char.decode('ascii', 'replace')
136
"""Keep asking the user until a valid choice is made.
139
getchoice = self._getline
141
getchoice = self._getchar
145
if 1 == iter or self.line_based:
146
self.ui.prompt(self.prompt)
150
self.ui.stderr.write(u'\n')
152
except KeyboardInterrupt:
153
self.ui.stderr.write(u'\n')
155
choice = choice.lower()
156
if choice not in self.alternatives:
157
# Not a valid choice, keep on asking.
159
name, index = self.alternatives[choice]
161
self.ui.stderr.write(name + u'\n')
165
opt_progress_bar = config.Option(
166
'progress_bar', help='Progress bar type.',
167
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
171
class TextUIFactory(UIFactory):
172
"""A UI factory for Text user interfaces."""
174
def __init__(self, stdin, stdout, stderr):
175
"""Create a TextUIFactory."""
176
super(TextUIFactory, self).__init__()
180
self._progress_view = NullProgressView()
183
# Choose default encoding and handle py2/3 differences
184
self._setup_streams()
185
# paints progress, network activity, etc
186
self._progress_view = self.make_progress_view()
189
def _setup_streams(self):
190
self.raw_stdin = _unwrap_stream(self.stdin)
191
self.stdin = _wrap_in_stream(self.raw_stdin)
192
self.raw_stdout = _unwrap_stream(self.stdout)
193
self.stdout = _wrap_out_stream(self.raw_stdout)
194
self.raw_stderr = _unwrap_stream(self.stderr)
195
self.stderr = _wrap_out_stream(self.raw_stderr)
197
def choose(self, msg, choices, default=None):
198
"""Prompt the user for a list of alternatives.
200
Support both line-based and char-based editing.
202
In line-based mode, both the shortcut and full choice name are valid
203
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
204
'YES ' are all valid input lines for choosing 'yes'.
206
An empty line, when in line-based mode, or pressing enter in char-based
207
mode will select the default choice (if any).
209
Choice is echoed back if:
210
- input is char-based; which means a controlling terminal is available,
211
and osutils.getchar is used
212
- input is line-based, and no controlling terminal is available
215
choose_ui = _ChooseUI(self, msg, choices, default)
216
return choose_ui.interact()
218
def be_quiet(self, state):
219
if state and not self._quiet:
221
UIFactory.be_quiet(self, state)
222
self._progress_view = self.make_progress_view()
224
def clear_term(self):
225
"""Prepare the terminal for output.
227
This will, clear any progress bars, and leave the cursor at the
228
leftmost position."""
229
# XXX: If this is preparing to write to stdout, but that's for example
230
# directed into a file rather than to the terminal, and the progress
231
# bar _is_ going to the terminal, we shouldn't need
232
# to clear it. We might need to separately check for the case of
233
self._progress_view.clear()
235
def get_integer(self, prompt):
238
line = self.stdin.readline()
244
def get_non_echoed_password(self):
245
isatty = getattr(self.stdin, 'isatty', None)
246
if isatty is not None and isatty():
247
# getpass() ensure the password is not echoed and other
248
# cross-platform niceties
249
password = getpass.getpass('')
251
# echo doesn't make sense without a terminal
252
password = self.stdin.readline()
256
if password[-1] == '\n':
257
password = password[:-1]
260
def get_password(self, prompt=u'', **kwargs):
261
"""Prompt the user for a password.
263
:param prompt: The prompt to present the user
264
:param kwargs: Arguments which will be expanded into the prompt.
265
This lets front ends display different things if
267
:return: The password string, return None if the user
268
canceled the request.
271
self.prompt(prompt, **kwargs)
272
# There's currently no way to say 'i decline to enter a password'
273
# as opposed to 'my password is empty' -- does it matter?
274
return self.get_non_echoed_password()
276
def get_username(self, prompt, **kwargs):
277
"""Prompt the user for a username.
279
:param prompt: The prompt to present the user
280
:param kwargs: Arguments which will be expanded into the prompt.
281
This lets front ends display different things if
283
:return: The username string, return None if the user
284
canceled the request.
287
self.prompt(prompt, **kwargs)
288
username = self.stdin.readline()
292
if username[-1] == '\n':
293
username = username[:-1]
296
def make_progress_view(self):
297
"""Construct and return a new ProgressView subclass for this UI.
299
# with --quiet, never any progress view
300
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
301
# user specifically requests either text or no progress bars, always
302
# do that. otherwise, guess based on $TERM and tty presence.
304
return NullProgressView()
305
pb_type = config.GlobalStack().get('progress_bar')
306
if pb_type == 'none': # Explicit requirement
307
return NullProgressView()
308
if (pb_type == 'text' or # Explicit requirement
309
progress._supports_progress(self.stderr)): # Guess
310
return TextProgressView(self.stderr)
311
# No explicit requirement and no successful guess
312
return NullProgressView()
314
def _make_output_stream_explicit(self, encoding, encoding_type):
315
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
318
"""Write an already-formatted message, clearing the progress bar if necessary."""
320
self.stdout.write(msg + '\n')
322
def prompt(self, prompt, **kwargs):
323
"""Emit prompt on the CLI.
325
:param kwargs: Dictionary of arguments to insert into the prompt,
326
to allow UIs to reformat the prompt.
328
if not isinstance(prompt, text_type):
329
raise ValueError("prompt %r not a unicode string" % prompt)
331
# See <https://launchpad.net/bugs/365891>
332
prompt = prompt % kwargs
335
self.stderr.write(prompt)
338
def report_transport_activity(self, transport, byte_count, direction):
339
"""Called by transports as they do IO.
341
This may update a progress bar, spinner, or similar display.
342
By default it does nothing.
344
self._progress_view.show_transport_activity(transport,
345
direction, byte_count)
347
def log_transport_activity(self, display=False):
348
"""See UIFactory.log_transport_activity()"""
349
log = getattr(self._progress_view, 'log_transport_activity', None)
353
def show_error(self, msg):
355
self.stderr.write("bzr: error: %s\n" % msg)
357
def show_message(self, msg):
360
def show_warning(self, msg):
362
self.stderr.write("bzr: warning: %s\n" % msg)
364
def _progress_updated(self, task):
365
"""A task has been updated and wants to be displayed.
367
if not self._task_stack:
368
warnings.warn("%r updated but no tasks are active" %
370
elif task != self._task_stack[-1]:
371
# We used to check it was the top task, but it's hard to always
372
# get this right and it's not necessarily useful: any actual
373
# problems will be evident in use
374
# warnings.warn("%r is not the top progress task %r" %
375
# (task, self._task_stack[-1]))
377
self._progress_view.show_progress(task)
379
def _progress_all_finished(self):
380
self._progress_view.clear()
382
def show_user_warning(self, warning_id, **message_args):
383
"""Show a text message to the user.
385
Explicitly not for warnings about bzr apis, deprecations or internals.
387
# eventually trace.warning should migrate here, to avoid logging and
388
# be easier to test; that has a lot of test fallout so for now just
389
# new code can call this
390
if warning_id not in self.suppressed_warnings:
391
warning = self.format_user_warning(warning_id, message_args)
392
self.stderr.write(warning + '\n')
395
def pad_to_width(line, width, encoding_hint='ascii'):
396
"""Truncate or pad unicode line to width.
398
This is best-effort for now, and strings containing control codes or
399
non-ascii text may be cut and padded incorrectly.
401
s = line.encode(encoding_hint, 'replace')
402
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
405
class TextProgressView(object):
406
"""Display of progress bar and other information on a tty.
408
This shows one line of text, including possibly a network indicator,
409
spinner, progress bar, message, etc.
411
One instance of this is created and held by the UI, and fed updates when a
412
task wants to be painted.
414
Transports feed data to this through the ui_factory object.
416
The Progress views can comprise a tree with _parent_task pointers, but
417
this only prints the stack from the nominated current task up to the root.
420
def __init__(self, term_file, encoding=None, errors=None):
421
self._term_file = term_file
423
self._encoding = getattr(term_file, "encoding", None) or "ascii"
425
self._encoding = encoding
426
# true when there's output on the screen we may need to clear
427
self._have_output = False
428
self._last_transport_msg = ''
430
# time we last repainted the screen
431
self._last_repaint = 0
432
# time we last got information about transport activity
433
self._transport_update_time = 0
434
self._last_task = None
435
self._total_byte_count = 0
436
self._bytes_since_update = 0
437
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
438
self._first_byte_time = None
440
# force the progress bar to be off, as at the moment it doesn't
441
# correspond reliably to overall command progress
442
self.enable_bar = False
444
def _avail_width(self):
445
# we need one extra space for terminals that wrap on last char
446
w = osutils.terminal_width()
452
def _show_line(self, u):
453
width = self._avail_width()
454
if width is not None:
455
u = pad_to_width(u, width, encoding_hint=self._encoding)
456
self._term_file.write('\r' + u + '\r')
459
if self._have_output:
461
self._have_output = False
463
def _render_bar(self):
464
# return a string for the progress bar itself
465
if self.enable_bar and (
466
(self._last_task is None) or self._last_task.show_bar):
467
# If there's no task object, we show space for the bar anyhow.
468
# That's because most invocations of bzr will end showing progress
469
# at some point, though perhaps only after doing some initial IO.
470
# It looks better to draw the progress bar initially rather than
471
# to have what looks like an incomplete progress bar.
472
spin_str = r'/-\|'[self._spin_pos % 4]
475
if self._last_task is None:
476
completion_fraction = 0
479
completion_fraction = \
480
self._last_task._overall_completion_fraction() or 0
481
if (completion_fraction < self._fraction and 'progress' in
484
self._fraction = completion_fraction
485
markers = int(round(float(cols) * completion_fraction)) - 1
486
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
488
elif (self._last_task is None) or self._last_task.show_spinner:
489
# The last task wanted just a spinner, no bar
490
spin_str = r'/-\|'[self._spin_pos % 4]
492
return spin_str + ' '
496
def _format_task(self, task):
497
"""Format task-specific parts of progress bar.
499
:returns: (text_part, counter_part) both unicode strings.
501
if not task.show_count:
503
elif task.current_cnt is not None and task.total_cnt is not None:
504
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
505
elif task.current_cnt is not None:
506
s = ' %d' % (task.current_cnt)
509
# compose all the parent messages
512
while t._parent_task:
518
def _render_line(self):
519
bar_string = self._render_bar()
521
task_part, counter_part = self._format_task(self._last_task)
523
task_part = counter_part = ''
524
if self._last_task and not self._last_task.show_transport_activity:
527
trans = self._last_transport_msg
528
# the bar separates the transport activity from the message, so even
529
# if there's no bar or spinner, we must show something if both those
531
if (task_part or trans) and not bar_string:
533
# preferentially truncate the task message if we don't have enough
535
avail_width = self._avail_width()
536
if avail_width is not None:
537
# if terminal avail_width is unknown, don't truncate
538
current_len = len(bar_string) + len(trans) + \
539
len(task_part) + len(counter_part)
540
# GZ 2017-04-22: Should measure and truncate task_part properly
541
gap = current_len - avail_width
543
task_part = task_part[:-gap - 2] + '..'
544
s = trans + bar_string + task_part + counter_part
545
if avail_width is not None:
546
if len(s) < avail_width:
547
s = s.ljust(avail_width)
548
elif len(s) > avail_width:
553
s = self._render_line()
555
self._have_output = True
557
def show_progress(self, task):
558
"""Called by the task object when it has changed.
560
:param task: The top task object; its parents are also included
563
must_update = task is not self._last_task
564
self._last_task = task
566
if (not must_update) and (now < self._last_repaint + task.update_latency):
568
if now > self._transport_update_time + 10:
569
# no recent activity; expire it
570
self._last_transport_msg = ''
571
self._last_repaint = now
574
def show_transport_activity(self, transport, direction, byte_count):
575
"""Called by transports via the ui_factory, as they do IO.
577
This may update a progress bar, spinner, or similar display.
578
By default it does nothing.
580
# XXX: there should be a transport activity model, and that too should
581
# be seen by the progress view, rather than being poked in here.
582
self._total_byte_count += byte_count
583
self._bytes_since_update += byte_count
584
if self._first_byte_time is None:
585
# Note that this isn't great, as technically it should be the time
586
# when the bytes started transferring, not when they completed.
587
# However, we usually start with a small request anyway.
588
self._first_byte_time = time.time()
589
if direction in self._bytes_by_direction:
590
self._bytes_by_direction[direction] += byte_count
592
self._bytes_by_direction['unknown'] += byte_count
593
if 'no_activity' in debug.debug_flags:
594
# Can be used as a workaround if
595
# <https://launchpad.net/bugs/321935> reappears and transport
596
# activity is cluttering other output. However, thanks to
597
# TextUIOutputStream this shouldn't be a problem any more.
600
if self._total_byte_count < 2000:
601
# a little resistance at first, so it doesn't stay stuck at 0
602
# while connecting...
604
if self._transport_update_time is None:
605
self._transport_update_time = now
606
elif now >= (self._transport_update_time + 0.5):
607
# guard against clock stepping backwards, and don't update too
609
rate = (self._bytes_since_update /
610
(now - self._transport_update_time))
611
# using base-10 units (see HACKING.txt).
612
msg = ("%6dkB %5dkB/s " %
613
(self._total_byte_count / 1000, int(rate) / 1000,))
614
self._transport_update_time = now
615
self._last_repaint = now
616
self._bytes_since_update = 0
617
self._last_transport_msg = msg
620
def _format_bytes_by_direction(self):
621
if self._first_byte_time is None:
624
transfer_time = time.time() - self._first_byte_time
625
if transfer_time < 0.001:
626
transfer_time = 0.001
627
bps = self._total_byte_count / transfer_time
629
# using base-10 units (see HACKING.txt).
630
msg = ('Transferred: %.0fkB'
631
' (%.1fkB/s r:%.0fkB w:%.0fkB'
632
% (self._total_byte_count / 1000.,
634
self._bytes_by_direction['read'] / 1000.,
635
self._bytes_by_direction['write'] / 1000.,
637
if self._bytes_by_direction['unknown'] > 0:
638
msg += ' u:%.0fkB)' % (
639
self._bytes_by_direction['unknown'] / 1000.
645
def log_transport_activity(self, display=False):
646
msg = self._format_bytes_by_direction()
648
if display and self._total_byte_count > 0:
650
self._term_file.write(msg + '\n')
653
def _get_stream_encoding(stream):
654
encoding = config.GlobalStack().get('output_encoding')
656
encoding = getattr(stream, "encoding", None)
658
encoding = osutils.get_terminal_encoding(trace=True)
662
def _unwrap_stream(stream):
663
inner = getattr(stream, "buffer", None)
665
inner = getattr(stream, "stream", stream)
669
def _wrap_in_stream(stream, encoding=None, errors='replace'):
671
encoding = _get_stream_encoding(stream)
672
# Attempt to wrap using io.open if possible, since that can do
675
fileno = stream.fileno()
676
except io.UnsupportedOperation:
677
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
678
encoded_stream.encoding = encoding
679
return encoded_stream
681
return io.open(fileno, encoding=encoding, errors=errors, mode='r', buffering=1)
684
def _wrap_out_stream(stream, encoding=None, errors='replace'):
686
encoding = _get_stream_encoding(stream)
687
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
688
encoded_stream.encoding = encoding
689
return encoded_stream
692
class TextUIOutputStream(object):
693
"""Decorates stream to interact better with progress and change encoding.
695
Before writing to the wrapped stream, progress is cleared. Callers must
696
ensure bulk output is terminated with a newline so progress won't overwrite
699
Additionally, the encoding and errors behaviour of the underlying stream
700
can be changed at this point. If errors is set to 'exact' raw bytes may be
701
written to the underlying stream.
704
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
705
self.ui_factory = ui_factory
706
# GZ 2017-05-21: Clean up semantics when callers are made saner.
707
inner = _unwrap_stream(stream)
708
self.raw_stream = None
709
if errors == "exact":
711
self.raw_stream = inner
713
self.wrapped_stream = stream
715
encoding = _get_stream_encoding(stream)
717
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
719
encoding = self.wrapped_stream.encoding
720
self.encoding = encoding
723
def _write(self, to_write):
724
if isinstance(to_write, bytes):
726
to_write = to_write.decode(self.encoding, self.errors)
727
except UnicodeDecodeError:
728
self.raw_stream.write(to_write)
730
self.wrapped_stream.write(to_write)
733
self.ui_factory.clear_term()
734
self.wrapped_stream.flush()
736
def write(self, to_write):
737
self.ui_factory.clear_term()
738
self._write(to_write)
740
def writelines(self, lines):
741
self.ui_factory.clear_term()