1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
27
from ..lazy_import import lazy_import
28
lazy_import(globals(), """
49
class _ChooseUI(object):
51
""" Helper class for choose implementation.
54
def __init__(self, ui, msg, choices, default):
57
self._build_alternatives(msg, choices, default)
59
def _setup_mode(self):
60
"""Setup input mode (line-based, char-based) and echo-back.
62
Line-based input is used if the BRZ_TEXTUI_INPUT environment
63
variable is set to 'line-based', or if there is no controlling
66
is_tty = self.ui.raw_stdin.isatty()
67
if (os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and
68
self.ui.raw_stdin == _unwrap_stream(sys.stdin) and is_tty):
69
self.line_based = False
72
self.line_based = True
73
self.echo_back = not is_tty
75
def _build_alternatives(self, msg, choices, default):
76
"""Parse choices string.
78
Setup final prompt and the lists of choices and associated
83
self.alternatives = {}
84
choices = choices.split('\n')
85
if default is not None and default not in range(0, len(choices)):
86
raise ValueError("invalid default index")
88
name = c.replace('&', '').lower()
89
choice = (name, index)
90
if name in self.alternatives:
91
raise ValueError("duplicated choice: %s" % name)
92
self.alternatives[name] = choice
93
shortcut = c.find('&')
94
if -1 != shortcut and (shortcut + 1) < len(c):
96
help += '[' + c[shortcut + 1] + ']'
97
help += c[(shortcut + 2):]
98
shortcut = c[shortcut + 1]
100
c = c.replace('&', '')
102
help = '[%s]%s' % (shortcut, c[1:])
103
shortcut = shortcut.lower()
104
if shortcut in self.alternatives:
105
raise ValueError("duplicated shortcut: %s" % shortcut)
106
self.alternatives[shortcut] = choice
107
# Add redirections for default.
109
self.alternatives[''] = choice
110
self.alternatives['\r'] = choice
111
help_list.append(help)
114
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
117
line = self.ui.stdin.readline()
123
char = osutils.getchar()
124
if char == chr(3): # INTR
125
raise KeyboardInterrupt
126
if char == chr(4): # EOF (^d, C-d)
128
if isinstance(char, bytes):
129
return char.decode('ascii', 'replace')
133
"""Keep asking the user until a valid choice is made.
136
getchoice = self._getline
138
getchoice = self._getchar
142
if 1 == iter or self.line_based:
143
self.ui.prompt(self.prompt)
147
self.ui.stderr.write(u'\n')
149
except KeyboardInterrupt:
150
self.ui.stderr.write(u'\n')
152
choice = choice.lower()
153
if choice not in self.alternatives:
154
# Not a valid choice, keep on asking.
156
name, index = self.alternatives[choice]
158
self.ui.stderr.write(name + u'\n')
162
opt_progress_bar = config.Option(
163
'progress_bar', help='Progress bar type.',
164
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
168
class TextUIFactory(UIFactory):
169
"""A UI factory for Text user interfaces."""
171
def __init__(self, stdin, stdout, stderr):
172
"""Create a TextUIFactory."""
173
super(TextUIFactory, self).__init__()
177
self._progress_view = NullProgressView()
180
# Choose default encoding and handle py2/3 differences
181
self._setup_streams()
182
# paints progress, network activity, etc
183
self._progress_view = self.make_progress_view()
186
def _setup_streams(self):
187
self.raw_stdin = _unwrap_stream(self.stdin)
188
self.stdin = _wrap_in_stream(self.raw_stdin)
189
self.raw_stdout = _unwrap_stream(self.stdout)
190
self.stdout = _wrap_out_stream(self.raw_stdout)
191
self.raw_stderr = _unwrap_stream(self.stderr)
192
self.stderr = _wrap_out_stream(self.raw_stderr)
194
def choose(self, msg, choices, default=None):
195
"""Prompt the user for a list of alternatives.
197
Support both line-based and char-based editing.
199
In line-based mode, both the shortcut and full choice name are valid
200
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
201
'YES ' are all valid input lines for choosing 'yes'.
203
An empty line, when in line-based mode, or pressing enter in char-based
204
mode will select the default choice (if any).
206
Choice is echoed back if:
207
- input is char-based; which means a controlling terminal is available,
208
and osutils.getchar is used
209
- input is line-based, and no controlling terminal is available
212
choose_ui = _ChooseUI(self, msg, choices, default)
213
return choose_ui.interact()
215
def be_quiet(self, state):
216
if state and not self._quiet:
218
UIFactory.be_quiet(self, state)
219
self._progress_view = self.make_progress_view()
221
def clear_term(self):
222
"""Prepare the terminal for output.
224
This will, clear any progress bars, and leave the cursor at the
225
leftmost position."""
226
# XXX: If this is preparing to write to stdout, but that's for example
227
# directed into a file rather than to the terminal, and the progress
228
# bar _is_ going to the terminal, we shouldn't need
229
# to clear it. We might need to separately check for the case of
230
self._progress_view.clear()
232
def get_integer(self, prompt):
235
line = self.stdin.readline()
241
def get_non_echoed_password(self):
242
isatty = getattr(self.stdin, 'isatty', None)
243
if isatty is not None and isatty():
244
# getpass() ensure the password is not echoed and other
245
# cross-platform niceties
246
password = getpass.getpass('')
248
# echo doesn't make sense without a terminal
249
password = self.stdin.readline()
253
if password[-1] == '\n':
254
password = password[:-1]
257
def get_password(self, prompt=u'', **kwargs):
258
"""Prompt the user for a password.
260
:param prompt: The prompt to present the user
261
:param kwargs: Arguments which will be expanded into the prompt.
262
This lets front ends display different things if
264
:return: The password string, return None if the user
265
canceled the request.
268
self.prompt(prompt, **kwargs)
269
# There's currently no way to say 'i decline to enter a password'
270
# as opposed to 'my password is empty' -- does it matter?
271
return self.get_non_echoed_password()
273
def get_username(self, prompt, **kwargs):
274
"""Prompt the user for a username.
276
:param prompt: The prompt to present the user
277
:param kwargs: Arguments which will be expanded into the prompt.
278
This lets front ends display different things if
280
:return: The username string, return None if the user
281
canceled the request.
284
self.prompt(prompt, **kwargs)
285
username = self.stdin.readline()
289
if username[-1] == '\n':
290
username = username[:-1]
293
def make_progress_view(self):
294
"""Construct and return a new ProgressView subclass for this UI.
296
# with --quiet, never any progress view
297
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
298
# user specifically requests either text or no progress bars, always
299
# do that. otherwise, guess based on $TERM and tty presence.
301
return NullProgressView()
302
pb_type = config.GlobalStack().get('progress_bar')
303
if pb_type == 'none': # Explicit requirement
304
return NullProgressView()
305
if (pb_type == 'text' or # Explicit requirement
306
progress._supports_progress(self.stderr)): # Guess
307
return TextProgressView(self.stderr)
308
# No explicit requirement and no successful guess
309
return NullProgressView()
311
def _make_output_stream_explicit(self, encoding, encoding_type):
312
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
315
"""Write an already-formatted message, clearing the progress bar if necessary."""
317
self.stdout.write(msg + '\n')
319
def prompt(self, prompt, **kwargs):
320
"""Emit prompt on the CLI.
322
:param kwargs: Dictionary of arguments to insert into the prompt,
323
to allow UIs to reformat the prompt.
325
if not isinstance(prompt, str):
326
raise ValueError("prompt %r not a unicode string" % prompt)
328
# See <https://launchpad.net/bugs/365891>
329
prompt = prompt % kwargs
332
self.stderr.write(prompt)
335
def report_transport_activity(self, transport, byte_count, direction):
336
"""Called by transports as they do IO.
338
This may update a progress bar, spinner, or similar display.
339
By default it does nothing.
341
self._progress_view.show_transport_activity(transport,
342
direction, byte_count)
344
def log_transport_activity(self, display=False):
345
"""See UIFactory.log_transport_activity()"""
346
log = getattr(self._progress_view, 'log_transport_activity', None)
350
def show_error(self, msg):
352
self.stderr.write("bzr: error: %s\n" % msg)
354
def show_message(self, msg):
357
def show_warning(self, msg):
359
self.stderr.write("bzr: warning: %s\n" % msg)
361
def _progress_updated(self, task):
362
"""A task has been updated and wants to be displayed.
364
if not self._task_stack:
365
warnings.warn("%r updated but no tasks are active" %
367
elif task != self._task_stack[-1]:
368
# We used to check it was the top task, but it's hard to always
369
# get this right and it's not necessarily useful: any actual
370
# problems will be evident in use
371
# warnings.warn("%r is not the top progress task %r" %
372
# (task, self._task_stack[-1]))
374
self._progress_view.show_progress(task)
376
def _progress_all_finished(self):
377
self._progress_view.clear()
379
def show_user_warning(self, warning_id, **message_args):
380
"""Show a text message to the user.
382
Explicitly not for warnings about bzr apis, deprecations or internals.
384
# eventually trace.warning should migrate here, to avoid logging and
385
# be easier to test; that has a lot of test fallout so for now just
386
# new code can call this
387
if warning_id not in self.suppressed_warnings:
388
warning = self.format_user_warning(warning_id, message_args)
389
self.stderr.write(warning + '\n')
392
def pad_to_width(line, width, encoding_hint='ascii'):
393
"""Truncate or pad unicode line to width.
395
This is best-effort for now, and strings containing control codes or
396
non-ascii text may be cut and padded incorrectly.
398
s = line.encode(encoding_hint, 'replace')
399
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
402
class TextProgressView(object):
403
"""Display of progress bar and other information on a tty.
405
This shows one line of text, including possibly a network indicator,
406
spinner, progress bar, message, etc.
408
One instance of this is created and held by the UI, and fed updates when a
409
task wants to be painted.
411
Transports feed data to this through the ui_factory object.
413
The Progress views can comprise a tree with _parent_task pointers, but
414
this only prints the stack from the nominated current task up to the root.
417
def __init__(self, term_file, encoding=None, errors=None):
418
self._term_file = term_file
420
self._encoding = getattr(term_file, "encoding", None) or "ascii"
422
self._encoding = encoding
423
# true when there's output on the screen we may need to clear
424
self._have_output = False
425
self._last_transport_msg = ''
427
# time we last repainted the screen
428
self._last_repaint = 0
429
# time we last got information about transport activity
430
self._transport_update_time = 0
431
self._last_task = None
432
self._total_byte_count = 0
433
self._bytes_since_update = 0
434
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
435
self._first_byte_time = None
437
# force the progress bar to be off, as at the moment it doesn't
438
# correspond reliably to overall command progress
439
self.enable_bar = False
441
def _avail_width(self):
442
# we need one extra space for terminals that wrap on last char
443
w = osutils.terminal_width()
449
def _show_line(self, u):
450
width = self._avail_width()
451
if width is not None:
452
u = pad_to_width(u, width, encoding_hint=self._encoding)
453
self._term_file.write('\r' + u + '\r')
456
if self._have_output:
458
self._have_output = False
460
def _render_bar(self):
461
# return a string for the progress bar itself
462
if self.enable_bar and (
463
(self._last_task is None) or self._last_task.show_bar):
464
# If there's no task object, we show space for the bar anyhow.
465
# That's because most invocations of bzr will end showing progress
466
# at some point, though perhaps only after doing some initial IO.
467
# It looks better to draw the progress bar initially rather than
468
# to have what looks like an incomplete progress bar.
469
spin_str = r'/-\|'[self._spin_pos % 4]
472
if self._last_task is None:
473
completion_fraction = 0
476
completion_fraction = \
477
self._last_task._overall_completion_fraction() or 0
478
if (completion_fraction < self._fraction and 'progress' in
481
self._fraction = completion_fraction
482
markers = int(round(float(cols) * completion_fraction)) - 1
483
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
485
elif (self._last_task is None) or self._last_task.show_spinner:
486
# The last task wanted just a spinner, no bar
487
spin_str = r'/-\|'[self._spin_pos % 4]
489
return spin_str + ' '
493
def _format_task(self, task):
494
"""Format task-specific parts of progress bar.
496
:returns: (text_part, counter_part) both unicode strings.
498
if not task.show_count:
500
elif task.current_cnt is not None and task.total_cnt is not None:
501
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
502
elif task.current_cnt is not None:
503
s = ' %d' % (task.current_cnt)
506
# compose all the parent messages
509
while t._parent_task:
515
def _render_line(self):
516
bar_string = self._render_bar()
518
task_part, counter_part = self._format_task(self._last_task)
520
task_part = counter_part = ''
521
if self._last_task and not self._last_task.show_transport_activity:
524
trans = self._last_transport_msg
525
# the bar separates the transport activity from the message, so even
526
# if there's no bar or spinner, we must show something if both those
528
if (task_part or trans) and not bar_string:
530
# preferentially truncate the task message if we don't have enough
532
avail_width = self._avail_width()
533
if avail_width is not None:
534
# if terminal avail_width is unknown, don't truncate
535
current_len = len(bar_string) + len(trans) + \
536
len(task_part) + len(counter_part)
537
# GZ 2017-04-22: Should measure and truncate task_part properly
538
gap = current_len - avail_width
540
task_part = task_part[:-gap - 2] + '..'
541
s = trans + bar_string + task_part + counter_part
542
if avail_width is not None:
543
if len(s) < avail_width:
544
s = s.ljust(avail_width)
545
elif len(s) > avail_width:
550
s = self._render_line()
552
self._have_output = True
554
def show_progress(self, task):
555
"""Called by the task object when it has changed.
557
:param task: The top task object; its parents are also included
560
must_update = task is not self._last_task
561
self._last_task = task
563
if (not must_update) and (now < self._last_repaint + task.update_latency):
565
if now > self._transport_update_time + 10:
566
# no recent activity; expire it
567
self._last_transport_msg = ''
568
self._last_repaint = now
571
def show_transport_activity(self, transport, direction, byte_count):
572
"""Called by transports via the ui_factory, as they do IO.
574
This may update a progress bar, spinner, or similar display.
575
By default it does nothing.
577
# XXX: there should be a transport activity model, and that too should
578
# be seen by the progress view, rather than being poked in here.
579
self._total_byte_count += byte_count
580
self._bytes_since_update += byte_count
581
if self._first_byte_time is None:
582
# Note that this isn't great, as technically it should be the time
583
# when the bytes started transferring, not when they completed.
584
# However, we usually start with a small request anyway.
585
self._first_byte_time = time.time()
586
if direction in self._bytes_by_direction:
587
self._bytes_by_direction[direction] += byte_count
589
self._bytes_by_direction['unknown'] += byte_count
590
if 'no_activity' in debug.debug_flags:
591
# Can be used as a workaround if
592
# <https://launchpad.net/bugs/321935> reappears and transport
593
# activity is cluttering other output. However, thanks to
594
# TextUIOutputStream this shouldn't be a problem any more.
597
if self._total_byte_count < 2000:
598
# a little resistance at first, so it doesn't stay stuck at 0
599
# while connecting...
601
if self._transport_update_time is None:
602
self._transport_update_time = now
603
elif now >= (self._transport_update_time + 0.5):
604
# guard against clock stepping backwards, and don't update too
606
rate = (self._bytes_since_update /
607
(now - self._transport_update_time))
608
# using base-10 units (see HACKING.txt).
609
msg = ("%6dkB %5dkB/s " %
610
(self._total_byte_count / 1000, int(rate) / 1000,))
611
self._transport_update_time = now
612
self._last_repaint = now
613
self._bytes_since_update = 0
614
self._last_transport_msg = msg
617
def _format_bytes_by_direction(self):
618
if self._first_byte_time is None:
621
transfer_time = time.time() - self._first_byte_time
622
if transfer_time < 0.001:
623
transfer_time = 0.001
624
bps = self._total_byte_count / transfer_time
626
# using base-10 units (see HACKING.txt).
627
msg = ('Transferred: %.0fkB'
628
' (%.1fkB/s r:%.0fkB w:%.0fkB'
629
% (self._total_byte_count / 1000.,
631
self._bytes_by_direction['read'] / 1000.,
632
self._bytes_by_direction['write'] / 1000.,
634
if self._bytes_by_direction['unknown'] > 0:
635
msg += ' u:%.0fkB)' % (
636
self._bytes_by_direction['unknown'] / 1000.
642
def log_transport_activity(self, display=False):
643
msg = self._format_bytes_by_direction()
645
if display and self._total_byte_count > 0:
647
self._term_file.write(msg + '\n')
650
def _get_stream_encoding(stream):
651
encoding = config.GlobalStack().get('output_encoding')
653
encoding = getattr(stream, "encoding", None)
655
encoding = osutils.get_terminal_encoding(trace=True)
659
def _unwrap_stream(stream):
660
inner = getattr(stream, "buffer", None)
662
inner = getattr(stream, "stream", stream)
666
def _wrap_in_stream(stream, encoding=None, errors='replace'):
668
encoding = _get_stream_encoding(stream)
669
# Attempt to wrap using io.open if possible, since that can do
672
fileno = stream.fileno()
673
except io.UnsupportedOperation:
674
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
675
encoded_stream.encoding = encoding
676
return encoded_stream
678
return io.open(fileno, encoding=encoding, errors=errors, mode='r', buffering=1)
681
def _wrap_out_stream(stream, encoding=None, errors='replace'):
683
encoding = _get_stream_encoding(stream)
684
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
685
encoded_stream.encoding = encoding
686
return encoded_stream
689
class TextUIOutputStream(object):
690
"""Decorates stream to interact better with progress and change encoding.
692
Before writing to the wrapped stream, progress is cleared. Callers must
693
ensure bulk output is terminated with a newline so progress won't overwrite
696
Additionally, the encoding and errors behaviour of the underlying stream
697
can be changed at this point. If errors is set to 'exact' raw bytes may be
698
written to the underlying stream.
701
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
702
self.ui_factory = ui_factory
703
# GZ 2017-05-21: Clean up semantics when callers are made saner.
704
inner = _unwrap_stream(stream)
705
self.raw_stream = None
706
if errors == "exact":
708
self.raw_stream = inner
710
self.wrapped_stream = stream
712
encoding = _get_stream_encoding(stream)
714
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
716
encoding = self.wrapped_stream.encoding
717
self.encoding = encoding
720
def _write(self, to_write):
721
if isinstance(to_write, bytes):
723
to_write = to_write.decode(self.encoding, self.errors)
724
except UnicodeDecodeError:
725
self.raw_stream.write(to_write)
727
self.wrapped_stream.write(to_write)
730
self.ui_factory.clear_term()
731
self.wrapped_stream.flush()
733
def write(self, to_write):
734
self.ui_factory.clear_term()
735
self._write(to_write)
737
def writelines(self, lines):
738
self.ui_factory.clear_term()