1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
26
from ..lazy_import import lazy_import
27
lazy_import(globals(), """
42
from ..sixish import (
51
class _ChooseUI(object):
53
""" Helper class for choose implementation.
56
def __init__(self, ui, msg, choices, default):
59
self._build_alternatives(msg, choices, default)
61
def _setup_mode(self):
62
"""Setup input mode (line-based, char-based) and echo-back.
64
Line-based input is used if the BRZ_TEXTUI_INPUT environment
65
variable is set to 'line-based', or if there is no controlling
68
is_tty = self.ui.raw_stdin.isatty()
69
if (os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and
70
self.ui.raw_stdin == _unwrap_stream(sys.stdin) and is_tty):
71
self.line_based = False
74
self.line_based = True
75
self.echo_back = not is_tty
77
def _build_alternatives(self, msg, choices, default):
78
"""Parse choices string.
80
Setup final prompt and the lists of choices and associated
85
self.alternatives = {}
86
choices = choices.split('\n')
87
if default is not None and default not in range(0, len(choices)):
88
raise ValueError("invalid default index")
90
name = c.replace('&', '').lower()
91
choice = (name, index)
92
if name in self.alternatives:
93
raise ValueError("duplicated choice: %s" % name)
94
self.alternatives[name] = choice
95
shortcut = c.find('&')
96
if -1 != shortcut and (shortcut + 1) < len(c):
98
help += '[' + c[shortcut + 1] + ']'
99
help += c[(shortcut + 2):]
100
shortcut = c[shortcut + 1]
102
c = c.replace('&', '')
104
help = '[%s]%s' % (shortcut, c[1:])
105
shortcut = shortcut.lower()
106
if shortcut in self.alternatives:
107
raise ValueError("duplicated shortcut: %s" % shortcut)
108
self.alternatives[shortcut] = choice
109
# Add redirections for default.
111
self.alternatives[''] = choice
112
self.alternatives['\r'] = choice
113
help_list.append(help)
116
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
119
line = self.ui.stdin.readline()
125
char = osutils.getchar()
126
if char == chr(3): # INTR
127
raise KeyboardInterrupt
128
if char == chr(4): # EOF (^d, C-d)
130
if isinstance(char, bytes):
131
return char.decode('ascii', 'replace')
135
"""Keep asking the user until a valid choice is made.
138
getchoice = self._getline
140
getchoice = self._getchar
144
if 1 == iter or self.line_based:
145
self.ui.prompt(self.prompt)
149
self.ui.stderr.write(u'\n')
151
except KeyboardInterrupt:
152
self.ui.stderr.write(u'\n')
154
choice = choice.lower()
155
if choice not in self.alternatives:
156
# Not a valid choice, keep on asking.
158
name, index = self.alternatives[choice]
160
self.ui.stderr.write(name + u'\n')
164
opt_progress_bar = config.Option(
165
'progress_bar', help='Progress bar type.',
166
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
170
class TextUIFactory(UIFactory):
171
"""A UI factory for Text user interfaces."""
173
def __init__(self, stdin, stdout, stderr):
174
"""Create a TextUIFactory."""
175
super(TextUIFactory, self).__init__()
179
self._progress_view = NullProgressView()
182
# Choose default encoding and handle py2/3 differences
183
self._setup_streams()
184
# paints progress, network activity, etc
185
self._progress_view = self.make_progress_view()
188
def _setup_streams(self):
189
self.raw_stdin = _unwrap_stream(self.stdin)
190
self.stdin = _wrap_in_stream(self.raw_stdin)
191
self.raw_stdout = _unwrap_stream(self.stdout)
192
self.stdout = _wrap_out_stream(self.raw_stdout)
193
self.raw_stderr = _unwrap_stream(self.stderr)
194
self.stderr = _wrap_out_stream(self.raw_stderr)
196
def choose(self, msg, choices, default=None):
197
"""Prompt the user for a list of alternatives.
199
Support both line-based and char-based editing.
201
In line-based mode, both the shortcut and full choice name are valid
202
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
203
'YES ' are all valid input lines for choosing 'yes'.
205
An empty line, when in line-based mode, or pressing enter in char-based
206
mode will select the default choice (if any).
208
Choice is echoed back if:
209
- input is char-based; which means a controlling terminal is available,
210
and osutils.getchar is used
211
- input is line-based, and no controlling terminal is available
214
choose_ui = _ChooseUI(self, msg, choices, default)
215
return choose_ui.interact()
217
def be_quiet(self, state):
218
if state and not self._quiet:
220
UIFactory.be_quiet(self, state)
221
self._progress_view = self.make_progress_view()
223
def clear_term(self):
224
"""Prepare the terminal for output.
226
This will, clear any progress bars, and leave the cursor at the
227
leftmost position."""
228
# XXX: If this is preparing to write to stdout, but that's for example
229
# directed into a file rather than to the terminal, and the progress
230
# bar _is_ going to the terminal, we shouldn't need
231
# to clear it. We might need to separately check for the case of
232
self._progress_view.clear()
234
def get_integer(self, prompt):
237
line = self.stdin.readline()
243
def get_non_echoed_password(self):
244
isatty = getattr(self.stdin, 'isatty', None)
245
if isatty is not None and isatty():
246
# getpass() ensure the password is not echoed and other
247
# cross-platform niceties
248
password = getpass.getpass('')
250
# echo doesn't make sense without a terminal
251
password = self.stdin.readline()
255
if password[-1] == '\n':
256
password = password[:-1]
259
def get_password(self, prompt=u'', **kwargs):
260
"""Prompt the user for a password.
262
:param prompt: The prompt to present the user
263
:param kwargs: Arguments which will be expanded into the prompt.
264
This lets front ends display different things if
266
:return: The password string, return None if the user
267
canceled the request.
270
self.prompt(prompt, **kwargs)
271
# There's currently no way to say 'i decline to enter a password'
272
# as opposed to 'my password is empty' -- does it matter?
273
return self.get_non_echoed_password()
275
def get_username(self, prompt, **kwargs):
276
"""Prompt the user for a username.
278
:param prompt: The prompt to present the user
279
:param kwargs: Arguments which will be expanded into the prompt.
280
This lets front ends display different things if
282
:return: The username string, return None if the user
283
canceled the request.
286
self.prompt(prompt, **kwargs)
287
username = self.stdin.readline()
291
if username[-1] == '\n':
292
username = username[:-1]
295
def make_progress_view(self):
296
"""Construct and return a new ProgressView subclass for this UI.
298
# with --quiet, never any progress view
299
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
300
# user specifically requests either text or no progress bars, always
301
# do that. otherwise, guess based on $TERM and tty presence.
303
return NullProgressView()
304
pb_type = config.GlobalStack().get('progress_bar')
305
if pb_type == 'none': # Explicit requirement
306
return NullProgressView()
307
if (pb_type == 'text' or # Explicit requirement
308
progress._supports_progress(self.stderr)): # Guess
309
return TextProgressView(self.stderr)
310
# No explicit requirement and no successful guess
311
return NullProgressView()
313
def _make_output_stream_explicit(self, encoding, encoding_type):
314
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
317
"""Write an already-formatted message, clearing the progress bar if necessary."""
319
self.stdout.write(msg + '\n')
321
def prompt(self, prompt, **kwargs):
322
"""Emit prompt on the CLI.
324
:param kwargs: Dictionary of arguments to insert into the prompt,
325
to allow UIs to reformat the prompt.
327
if not isinstance(prompt, text_type):
328
raise ValueError("prompt %r not a unicode string" % prompt)
330
# See <https://launchpad.net/bugs/365891>
331
prompt = prompt % kwargs
334
self.stderr.write(prompt)
337
def report_transport_activity(self, transport, byte_count, direction):
338
"""Called by transports as they do IO.
340
This may update a progress bar, spinner, or similar display.
341
By default it does nothing.
343
self._progress_view.show_transport_activity(transport,
344
direction, byte_count)
346
def log_transport_activity(self, display=False):
347
"""See UIFactory.log_transport_activity()"""
348
log = getattr(self._progress_view, 'log_transport_activity', None)
352
def show_error(self, msg):
354
self.stderr.write("bzr: error: %s\n" % msg)
356
def show_message(self, msg):
359
def show_warning(self, msg):
361
self.stderr.write("bzr: warning: %s\n" % msg)
363
def _progress_updated(self, task):
364
"""A task has been updated and wants to be displayed.
366
if not self._task_stack:
367
warnings.warn("%r updated but no tasks are active" %
369
elif task != self._task_stack[-1]:
370
# We used to check it was the top task, but it's hard to always
371
# get this right and it's not necessarily useful: any actual
372
# problems will be evident in use
373
# warnings.warn("%r is not the top progress task %r" %
374
# (task, self._task_stack[-1]))
376
self._progress_view.show_progress(task)
378
def _progress_all_finished(self):
379
self._progress_view.clear()
381
def show_user_warning(self, warning_id, **message_args):
382
"""Show a text message to the user.
384
Explicitly not for warnings about bzr apis, deprecations or internals.
386
# eventually trace.warning should migrate here, to avoid logging and
387
# be easier to test; that has a lot of test fallout so for now just
388
# new code can call this
389
if warning_id not in self.suppressed_warnings:
390
warning = self.format_user_warning(warning_id, message_args)
391
self.stderr.write(warning + '\n')
394
def pad_to_width(line, width, encoding_hint='ascii'):
395
"""Truncate or pad unicode line to width.
397
This is best-effort for now, and strings containing control codes or
398
non-ascii text may be cut and padded incorrectly.
400
s = line.encode(encoding_hint, 'replace')
401
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
404
class TextProgressView(object):
405
"""Display of progress bar and other information on a tty.
407
This shows one line of text, including possibly a network indicator,
408
spinner, progress bar, message, etc.
410
One instance of this is created and held by the UI, and fed updates when a
411
task wants to be painted.
413
Transports feed data to this through the ui_factory object.
415
The Progress views can comprise a tree with _parent_task pointers, but
416
this only prints the stack from the nominated current task up to the root.
419
def __init__(self, term_file, encoding=None, errors=None):
420
self._term_file = term_file
422
self._encoding = getattr(term_file, "encoding", None) or "ascii"
424
self._encoding = encoding
425
# true when there's output on the screen we may need to clear
426
self._have_output = False
427
self._last_transport_msg = ''
429
# time we last repainted the screen
430
self._last_repaint = 0
431
# time we last got information about transport activity
432
self._transport_update_time = 0
433
self._last_task = None
434
self._total_byte_count = 0
435
self._bytes_since_update = 0
436
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
437
self._first_byte_time = None
439
# force the progress bar to be off, as at the moment it doesn't
440
# correspond reliably to overall command progress
441
self.enable_bar = False
443
def _avail_width(self):
444
# we need one extra space for terminals that wrap on last char
445
w = osutils.terminal_width()
451
def _show_line(self, u):
452
width = self._avail_width()
453
if width is not None:
454
u = pad_to_width(u, width, encoding_hint=self._encoding)
455
self._term_file.write('\r' + u + '\r')
458
if self._have_output:
460
self._have_output = False
462
def _render_bar(self):
463
# return a string for the progress bar itself
464
if self.enable_bar and (
465
(self._last_task is None) or self._last_task.show_bar):
466
# If there's no task object, we show space for the bar anyhow.
467
# That's because most invocations of bzr will end showing progress
468
# at some point, though perhaps only after doing some initial IO.
469
# It looks better to draw the progress bar initially rather than
470
# to have what looks like an incomplete progress bar.
471
spin_str = r'/-\|'[self._spin_pos % 4]
474
if self._last_task is None:
475
completion_fraction = 0
478
completion_fraction = \
479
self._last_task._overall_completion_fraction() or 0
480
if (completion_fraction < self._fraction and 'progress' in
483
self._fraction = completion_fraction
484
markers = int(round(float(cols) * completion_fraction)) - 1
485
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
487
elif (self._last_task is None) or self._last_task.show_spinner:
488
# The last task wanted just a spinner, no bar
489
spin_str = r'/-\|'[self._spin_pos % 4]
491
return spin_str + ' '
495
def _format_task(self, task):
496
"""Format task-specific parts of progress bar.
498
:returns: (text_part, counter_part) both unicode strings.
500
if not task.show_count:
502
elif task.current_cnt is not None and task.total_cnt is not None:
503
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
504
elif task.current_cnt is not None:
505
s = ' %d' % (task.current_cnt)
508
# compose all the parent messages
511
while t._parent_task:
517
def _render_line(self):
518
bar_string = self._render_bar()
520
task_part, counter_part = self._format_task(self._last_task)
522
task_part = counter_part = ''
523
if self._last_task and not self._last_task.show_transport_activity:
526
trans = self._last_transport_msg
527
# the bar separates the transport activity from the message, so even
528
# if there's no bar or spinner, we must show something if both those
530
if (task_part or trans) and not bar_string:
532
# preferentially truncate the task message if we don't have enough
534
avail_width = self._avail_width()
535
if avail_width is not None:
536
# if terminal avail_width is unknown, don't truncate
537
current_len = len(bar_string) + len(trans) + \
538
len(task_part) + len(counter_part)
539
# GZ 2017-04-22: Should measure and truncate task_part properly
540
gap = current_len - avail_width
542
task_part = task_part[:-gap - 2] + '..'
543
s = trans + bar_string + task_part + counter_part
544
if avail_width is not None:
545
if len(s) < avail_width:
546
s = s.ljust(avail_width)
547
elif len(s) > avail_width:
552
s = self._render_line()
554
self._have_output = True
556
def show_progress(self, task):
557
"""Called by the task object when it has changed.
559
:param task: The top task object; its parents are also included
562
must_update = task is not self._last_task
563
self._last_task = task
565
if (not must_update) and (now < self._last_repaint + task.update_latency):
567
if now > self._transport_update_time + 10:
568
# no recent activity; expire it
569
self._last_transport_msg = ''
570
self._last_repaint = now
573
def show_transport_activity(self, transport, direction, byte_count):
574
"""Called by transports via the ui_factory, as they do IO.
576
This may update a progress bar, spinner, or similar display.
577
By default it does nothing.
579
# XXX: there should be a transport activity model, and that too should
580
# be seen by the progress view, rather than being poked in here.
581
self._total_byte_count += byte_count
582
self._bytes_since_update += byte_count
583
if self._first_byte_time is None:
584
# Note that this isn't great, as technically it should be the time
585
# when the bytes started transferring, not when they completed.
586
# However, we usually start with a small request anyway.
587
self._first_byte_time = time.time()
588
if direction in self._bytes_by_direction:
589
self._bytes_by_direction[direction] += byte_count
591
self._bytes_by_direction['unknown'] += byte_count
592
if 'no_activity' in debug.debug_flags:
593
# Can be used as a workaround if
594
# <https://launchpad.net/bugs/321935> reappears and transport
595
# activity is cluttering other output. However, thanks to
596
# TextUIOutputStream this shouldn't be a problem any more.
599
if self._total_byte_count < 2000:
600
# a little resistance at first, so it doesn't stay stuck at 0
601
# while connecting...
603
if self._transport_update_time is None:
604
self._transport_update_time = now
605
elif now >= (self._transport_update_time + 0.5):
606
# guard against clock stepping backwards, and don't update too
608
rate = (self._bytes_since_update /
609
(now - self._transport_update_time))
610
# using base-10 units (see HACKING.txt).
611
msg = ("%6dkB %5dkB/s " %
612
(self._total_byte_count / 1000, int(rate) / 1000,))
613
self._transport_update_time = now
614
self._last_repaint = now
615
self._bytes_since_update = 0
616
self._last_transport_msg = msg
619
def _format_bytes_by_direction(self):
620
if self._first_byte_time is None:
623
transfer_time = time.time() - self._first_byte_time
624
if transfer_time < 0.001:
625
transfer_time = 0.001
626
bps = self._total_byte_count / transfer_time
628
# using base-10 units (see HACKING.txt).
629
msg = ('Transferred: %.0fkB'
630
' (%.1fkB/s r:%.0fkB w:%.0fkB'
631
% (self._total_byte_count / 1000.,
633
self._bytes_by_direction['read'] / 1000.,
634
self._bytes_by_direction['write'] / 1000.,
636
if self._bytes_by_direction['unknown'] > 0:
637
msg += ' u:%.0fkB)' % (
638
self._bytes_by_direction['unknown'] / 1000.
644
def log_transport_activity(self, display=False):
645
msg = self._format_bytes_by_direction()
647
if display and self._total_byte_count > 0:
649
self._term_file.write(msg + '\n')
652
def _get_stream_encoding(stream):
653
encoding = config.GlobalStack().get('output_encoding')
655
encoding = getattr(stream, "encoding", None)
657
encoding = osutils.get_terminal_encoding(trace=True)
661
def _unwrap_stream(stream):
662
inner = getattr(stream, "buffer", None)
664
inner = getattr(stream, "stream", stream)
668
def _wrap_in_stream(stream, encoding=None, errors='replace'):
670
encoding = _get_stream_encoding(stream)
671
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
672
encoded_stream.encoding = encoding
673
return encoded_stream
676
def _wrap_out_stream(stream, encoding=None, errors='replace'):
678
encoding = _get_stream_encoding(stream)
679
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
680
encoded_stream.encoding = encoding
681
return encoded_stream
684
class TextUIOutputStream(object):
685
"""Decorates stream to interact better with progress and change encoding.
687
Before writing to the wrapped stream, progress is cleared. Callers must
688
ensure bulk output is terminated with a newline so progress won't overwrite
691
Additionally, the encoding and errors behaviour of the underlying stream
692
can be changed at this point. If errors is set to 'exact' raw bytes may be
693
written to the underlying stream.
696
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
697
self.ui_factory = ui_factory
698
# GZ 2017-05-21: Clean up semantics when callers are made saner.
699
inner = _unwrap_stream(stream)
700
self.raw_stream = None
701
if errors == "exact":
703
self.raw_stream = inner
705
self.wrapped_stream = stream
707
encoding = _get_stream_encoding(stream)
709
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
711
encoding = self.wrapped_stream.encoding
712
self.encoding = encoding
715
def _write(self, to_write):
716
if isinstance(to_write, bytes):
718
to_write = to_write.decode(self.encoding, self.errors)
719
except UnicodeDecodeError:
720
self.raw_stream.write(to_write)
722
self.wrapped_stream.write(to_write)
725
self.ui_factory.clear_term()
726
self.wrapped_stream.flush()
728
def write(self, to_write):
729
self.ui_factory.clear_term()
730
self._write(to_write)
732
def writelines(self, lines):
733
self.ui_factory.clear_term()