1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
26
from ..lazy_import import lazy_import
27
lazy_import(globals(), """
42
from ..sixish import (
51
class _ChooseUI(object):
53
""" Helper class for choose implementation.
56
def __init__(self, ui, msg, choices, default):
59
self._build_alternatives(msg, choices, default)
61
def _setup_mode(self):
62
"""Setup input mode (line-based, char-based) and echo-back.
64
Line-based input is used if the BRZ_TEXTUI_INPUT environment
65
variable is set to 'line-based', or if there is no controlling
68
if os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and \
69
self.ui.stdin == sys.stdin and self.ui.stdin.isatty():
70
self.line_based = False
73
self.line_based = True
74
self.echo_back = not self.ui.stdin.isatty()
76
def _build_alternatives(self, msg, choices, default):
77
"""Parse choices string.
79
Setup final prompt and the lists of choices and associated
84
self.alternatives = {}
85
choices = choices.split('\n')
86
if default is not None and default not in range(0, len(choices)):
87
raise ValueError("invalid default index")
89
name = c.replace('&', '').lower()
90
choice = (name, index)
91
if name in self.alternatives:
92
raise ValueError("duplicated choice: %s" % name)
93
self.alternatives[name] = choice
94
shortcut = c.find('&')
95
if -1 != shortcut and (shortcut + 1) < len(c):
97
help += '[' + c[shortcut + 1] + ']'
98
help += c[(shortcut + 2):]
99
shortcut = c[shortcut + 1]
101
c = c.replace('&', '')
103
help = '[%s]%s' % (shortcut, c[1:])
104
shortcut = shortcut.lower()
105
if shortcut in self.alternatives:
106
raise ValueError("duplicated shortcut: %s" % shortcut)
107
self.alternatives[shortcut] = choice
108
# Add redirections for default.
110
self.alternatives[''] = choice
111
self.alternatives['\r'] = choice
112
help_list.append(help)
115
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
118
line = self.ui.stdin.readline()
124
char = osutils.getchar()
125
if char == chr(3): # INTR
126
raise KeyboardInterrupt
127
if char == chr(4): # EOF (^d, C-d)
129
return char.decode("ascii", "replace")
132
"""Keep asking the user until a valid choice is made.
135
getchoice = self._getline
137
getchoice = self._getchar
141
if 1 == iter or self.line_based:
142
self.ui.prompt(self.prompt)
146
self.ui.stderr.write(u'\n')
148
except KeyboardInterrupt:
149
self.ui.stderr.write(u'\n')
151
choice = choice.lower()
152
if choice not in self.alternatives:
153
# Not a valid choice, keep on asking.
155
name, index = self.alternatives[choice]
157
self.ui.stderr.write(name + u'\n')
161
opt_progress_bar = config.Option(
162
'progress_bar', help='Progress bar type.',
163
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
167
class TextUIFactory(UIFactory):
168
"""A UI factory for Text user interfaces."""
170
def __init__(self, stdin, stdout, stderr):
171
"""Create a TextUIFactory."""
172
super(TextUIFactory, self).__init__()
176
# paints progress, network activity, etc
177
self._progress_view = self.make_progress_view()
179
def choose(self, msg, choices, default=None):
180
"""Prompt the user for a list of alternatives.
182
Support both line-based and char-based editing.
184
In line-based mode, both the shortcut and full choice name are valid
185
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
186
'YES ' are all valid input lines for choosing 'yes'.
188
An empty line, when in line-based mode, or pressing enter in char-based
189
mode will select the default choice (if any).
191
Choice is echoed back if:
192
- input is char-based; which means a controlling terminal is available,
193
and osutils.getchar is used
194
- input is line-based, and no controlling terminal is available
197
choose_ui = _ChooseUI(self, msg, choices, default)
198
return choose_ui.interact()
200
def be_quiet(self, state):
201
if state and not self._quiet:
203
UIFactory.be_quiet(self, state)
204
self._progress_view = self.make_progress_view()
206
def clear_term(self):
207
"""Prepare the terminal for output.
209
This will, clear any progress bars, and leave the cursor at the
210
leftmost position."""
211
# XXX: If this is preparing to write to stdout, but that's for example
212
# directed into a file rather than to the terminal, and the progress
213
# bar _is_ going to the terminal, we shouldn't need
214
# to clear it. We might need to separately check for the case of
215
self._progress_view.clear()
217
def get_integer(self, prompt):
220
line = self.stdin.readline()
226
def get_non_echoed_password(self):
227
isatty = getattr(self.stdin, 'isatty', None)
228
if isatty is not None and isatty():
229
# getpass() ensure the password is not echoed and other
230
# cross-platform niceties
231
password = getpass.getpass('')
233
# echo doesn't make sense without a terminal
234
password = self.stdin.readline()
238
if password[-1] == '\n':
239
password = password[:-1]
242
def get_password(self, prompt=u'', **kwargs):
243
"""Prompt the user for a password.
245
:param prompt: The prompt to present the user
246
:param kwargs: Arguments which will be expanded into the prompt.
247
This lets front ends display different things if
249
:return: The password string, return None if the user
250
canceled the request.
253
self.prompt(prompt, **kwargs)
254
# There's currently no way to say 'i decline to enter a password'
255
# as opposed to 'my password is empty' -- does it matter?
256
return self.get_non_echoed_password()
258
def get_username(self, prompt, **kwargs):
259
"""Prompt the user for a username.
261
:param prompt: The prompt to present the user
262
:param kwargs: Arguments which will be expanded into the prompt.
263
This lets front ends display different things if
265
:return: The username string, return None if the user
266
canceled the request.
269
self.prompt(prompt, **kwargs)
270
username = self.stdin.readline()
274
if username[-1] == '\n':
275
username = username[:-1]
278
def make_progress_view(self):
279
"""Construct and return a new ProgressView subclass for this UI.
281
# with --quiet, never any progress view
282
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
283
# user specifically requests either text or no progress bars, always
284
# do that. otherwise, guess based on $TERM and tty presence.
286
return NullProgressView()
287
pb_type = config.GlobalStack().get('progress_bar')
288
if pb_type == 'none': # Explicit requirement
289
return NullProgressView()
290
if (pb_type == 'text' # Explicit requirement
291
or progress._supports_progress(self.stderr)): # Guess
292
return TextProgressView(self.stderr)
293
# No explicit requirement and no successful guess
294
return NullProgressView()
296
def _make_output_stream_explicit(self, encoding, encoding_type):
297
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
300
"""Write an already-formatted message, clearing the progress bar if necessary."""
302
self.stdout.write(msg + '\n')
304
def prompt(self, prompt, **kwargs):
305
"""Emit prompt on the CLI.
307
:param kwargs: Dictionary of arguments to insert into the prompt,
308
to allow UIs to reformat the prompt.
310
if not isinstance(prompt, text_type):
311
raise ValueError("prompt %r not a unicode string" % prompt)
313
# See <https://launchpad.net/bugs/365891>
314
prompt = prompt % kwargs
317
self.stderr.write(prompt)
319
def report_transport_activity(self, transport, byte_count, direction):
320
"""Called by transports as they do IO.
322
This may update a progress bar, spinner, or similar display.
323
By default it does nothing.
325
self._progress_view.show_transport_activity(transport,
326
direction, byte_count)
328
def log_transport_activity(self, display=False):
329
"""See UIFactory.log_transport_activity()"""
330
log = getattr(self._progress_view, 'log_transport_activity', None)
334
def show_error(self, msg):
336
self.stderr.write("bzr: error: %s\n" % msg)
338
def show_message(self, msg):
341
def show_warning(self, msg):
343
self.stderr.write("bzr: warning: %s\n" % msg)
345
def _progress_updated(self, task):
346
"""A task has been updated and wants to be displayed.
348
if not self._task_stack:
349
warnings.warn("%r updated but no tasks are active" %
351
elif task != self._task_stack[-1]:
352
# We used to check it was the top task, but it's hard to always
353
# get this right and it's not necessarily useful: any actual
354
# problems will be evident in use
355
#warnings.warn("%r is not the top progress task %r" %
356
# (task, self._task_stack[-1]))
358
self._progress_view.show_progress(task)
360
def _progress_all_finished(self):
361
self._progress_view.clear()
363
def show_user_warning(self, warning_id, **message_args):
364
"""Show a text message to the user.
366
Explicitly not for warnings about bzr apis, deprecations or internals.
368
# eventually trace.warning should migrate here, to avoid logging and
369
# be easier to test; that has a lot of test fallout so for now just
370
# new code can call this
371
if warning_id not in self.suppressed_warnings:
372
warning = self.format_user_warning(warning_id, message_args)
373
self.stderr.write(warning + '\n')
376
def pad_to_width(line, width, encoding_hint='ascii'):
377
"""Truncate or pad unicode line to width.
379
This is best-effort for now, and strings containing control codes or
380
non-ascii text may be cut and padded incorrectly.
382
s = line.encode(encoding_hint, 'replace')
383
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
386
class TextProgressView(object):
387
"""Display of progress bar and other information on a tty.
389
This shows one line of text, including possibly a network indicator,
390
spinner, progress bar, message, etc.
392
One instance of this is created and held by the UI, and fed updates when a
393
task wants to be painted.
395
Transports feed data to this through the ui_factory object.
397
The Progress views can comprise a tree with _parent_task pointers, but
398
this only prints the stack from the nominated current task up to the root.
401
def __init__(self, term_file, encoding=None, errors=None):
402
self._term_file = term_file
404
self._encoding = getattr(term_file, "encoding", None) or "ascii"
406
self._encoding = encoding
407
# true when there's output on the screen we may need to clear
408
self._have_output = False
409
self._last_transport_msg = ''
411
# time we last repainted the screen
412
self._last_repaint = 0
413
# time we last got information about transport activity
414
self._transport_update_time = 0
415
self._last_task = None
416
self._total_byte_count = 0
417
self._bytes_since_update = 0
418
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
419
self._first_byte_time = None
421
# force the progress bar to be off, as at the moment it doesn't
422
# correspond reliably to overall command progress
423
self.enable_bar = False
425
def _avail_width(self):
426
# we need one extra space for terminals that wrap on last char
427
w = osutils.terminal_width()
433
def _show_line(self, u):
434
width = self._avail_width()
435
if width is not None:
436
u = pad_to_width(u, width, encoding_hint=self._encoding)
437
self._term_file.write('\r' + u + '\r')
440
if self._have_output:
442
self._have_output = False
444
def _render_bar(self):
445
# return a string for the progress bar itself
446
if self.enable_bar and (
447
(self._last_task is None) or self._last_task.show_bar):
448
# If there's no task object, we show space for the bar anyhow.
449
# That's because most invocations of bzr will end showing progress
450
# at some point, though perhaps only after doing some initial IO.
451
# It looks better to draw the progress bar initially rather than
452
# to have what looks like an incomplete progress bar.
453
spin_str = r'/-\|'[self._spin_pos % 4]
456
if self._last_task is None:
457
completion_fraction = 0
460
completion_fraction = \
461
self._last_task._overall_completion_fraction() or 0
462
if (completion_fraction < self._fraction and 'progress' in
465
self._fraction = completion_fraction
466
markers = int(round(float(cols) * completion_fraction)) - 1
467
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
469
elif (self._last_task is None) or self._last_task.show_spinner:
470
# The last task wanted just a spinner, no bar
471
spin_str = r'/-\|'[self._spin_pos % 4]
473
return spin_str + ' '
477
def _format_task(self, task):
478
"""Format task-specific parts of progress bar.
480
:returns: (text_part, counter_part) both unicode strings.
482
if not task.show_count:
484
elif task.current_cnt is not None and task.total_cnt is not None:
485
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
486
elif task.current_cnt is not None:
487
s = ' %d' % (task.current_cnt)
490
# compose all the parent messages
493
while t._parent_task:
499
def _render_line(self):
500
bar_string = self._render_bar()
502
task_part, counter_part = self._format_task(self._last_task)
504
task_part = counter_part = ''
505
if self._last_task and not self._last_task.show_transport_activity:
508
trans = self._last_transport_msg
509
# the bar separates the transport activity from the message, so even
510
# if there's no bar or spinner, we must show something if both those
512
if (task_part or trans) and not bar_string:
514
# preferentially truncate the task message if we don't have enough
516
avail_width = self._avail_width()
517
if avail_width is not None:
518
# if terminal avail_width is unknown, don't truncate
519
current_len = len(bar_string) + len(trans) + len(task_part) + len(counter_part)
520
# GZ 2017-04-22: Should measure and truncate task_part properly
521
gap = current_len - avail_width
523
task_part = task_part[:-gap-2] + '..'
524
s = trans + bar_string + task_part + counter_part
525
if avail_width is not None:
526
if len(s) < avail_width:
527
s = s.ljust(avail_width)
528
elif len(s) > avail_width:
533
s = self._render_line()
535
self._have_output = True
537
def show_progress(self, task):
538
"""Called by the task object when it has changed.
540
:param task: The top task object; its parents are also included
543
must_update = task is not self._last_task
544
self._last_task = task
546
if (not must_update) and (now < self._last_repaint + task.update_latency):
548
if now > self._transport_update_time + 10:
549
# no recent activity; expire it
550
self._last_transport_msg = ''
551
self._last_repaint = now
554
def show_transport_activity(self, transport, direction, byte_count):
555
"""Called by transports via the ui_factory, as they do IO.
557
This may update a progress bar, spinner, or similar display.
558
By default it does nothing.
560
# XXX: there should be a transport activity model, and that too should
561
# be seen by the progress view, rather than being poked in here.
562
self._total_byte_count += byte_count
563
self._bytes_since_update += byte_count
564
if self._first_byte_time is None:
565
# Note that this isn't great, as technically it should be the time
566
# when the bytes started transferring, not when they completed.
567
# However, we usually start with a small request anyway.
568
self._first_byte_time = time.time()
569
if direction in self._bytes_by_direction:
570
self._bytes_by_direction[direction] += byte_count
572
self._bytes_by_direction['unknown'] += byte_count
573
if 'no_activity' in debug.debug_flags:
574
# Can be used as a workaround if
575
# <https://launchpad.net/bugs/321935> reappears and transport
576
# activity is cluttering other output. However, thanks to
577
# TextUIOutputStream this shouldn't be a problem any more.
580
if self._total_byte_count < 2000:
581
# a little resistance at first, so it doesn't stay stuck at 0
582
# while connecting...
584
if self._transport_update_time is None:
585
self._transport_update_time = now
586
elif now >= (self._transport_update_time + 0.5):
587
# guard against clock stepping backwards, and don't update too
589
rate = (self._bytes_since_update
590
/ (now - self._transport_update_time))
591
# using base-10 units (see HACKING.txt).
592
msg = ("%6dkB %5dkB/s " %
593
(self._total_byte_count / 1000, int(rate) / 1000,))
594
self._transport_update_time = now
595
self._last_repaint = now
596
self._bytes_since_update = 0
597
self._last_transport_msg = msg
600
def _format_bytes_by_direction(self):
601
if self._first_byte_time is None:
604
transfer_time = time.time() - self._first_byte_time
605
if transfer_time < 0.001:
606
transfer_time = 0.001
607
bps = self._total_byte_count / transfer_time
609
# using base-10 units (see HACKING.txt).
610
msg = ('Transferred: %.0fkB'
611
' (%.1fkB/s r:%.0fkB w:%.0fkB'
612
% (self._total_byte_count / 1000.,
614
self._bytes_by_direction['read'] / 1000.,
615
self._bytes_by_direction['write'] / 1000.,
617
if self._bytes_by_direction['unknown'] > 0:
618
msg += ' u:%.0fkB)' % (
619
self._bytes_by_direction['unknown'] / 1000.
625
def log_transport_activity(self, display=False):
626
msg = self._format_bytes_by_direction()
628
if display and self._total_byte_count > 0:
630
self._term_file.write(msg + '\n')
633
def _get_stream_encoding(stream):
634
encoding = config.GlobalStack().get('output_encoding')
636
encoding = getattr(stream, "encoding", None)
638
encoding = osutils.get_terminal_encoding(trace=True)
642
def _unwrap_stream(stream):
643
inner = getattr(stream, "buffer", None)
645
inner = getattr(stream, "stream", None)
649
def _wrap_in_stream(stream, encoding=None, errors='replace'):
651
encoding = _get_stream_encoding(stream)
652
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
653
encoded_stream.encoding = encoding
654
return encoded_stream
657
def _wrap_out_stream(stream, encoding=None, errors='replace'):
659
encoding = _get_stream_encoding(stream)
660
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
661
encoded_stream.encoding = encoding
662
return encoded_stream
665
class TextUIOutputStream(object):
666
"""Decorates stream to interact better with progress and change encoding.
668
Before writing to the wrapped stream, progress is cleared. Callers must
669
ensure bulk output is terminated with a newline so progress won't overwrite
672
Additionally, the encoding and errors behaviour of the underlying stream
673
can be changed at this point. If errors is set to 'exact' raw bytes may be
674
written to the underlying stream.
677
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
678
self.ui_factory = ui_factory
679
# GZ 2017-05-21: Clean up semantics when callers are made saner.
680
inner = _unwrap_stream(stream)
681
self.raw_stream = None
682
if errors == "exact":
684
self.raw_stream = inner
686
self.wrapped_stream = stream
688
encoding = _get_stream_encoding(stream)
690
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
692
encoding = self.wrapped_stream.encoding
693
self.encoding = encoding
696
def _write(self, to_write):
697
if isinstance(to_write, bytes):
699
to_write = to_write.decode(self.encoding, self.errors)
700
except UnicodeDecodeError:
701
self.raw_stream.write(to_write)
703
self.wrapped_stream.write(to_write)
706
self.ui_factory.clear_term()
707
self.wrapped_stream.flush()
709
def write(self, to_write):
710
self.ui_factory.clear_term()
711
self._write(to_write)
713
def writelines(self, lines):
714
self.ui_factory.clear_term()