1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Text UI, write output to the console."""
19
from __future__ import absolute_import
26
from ..lazy_import import lazy_import
27
lazy_import(globals(), """
48
class _ChooseUI(object):
50
""" Helper class for choose implementation.
53
def __init__(self, ui, msg, choices, default):
56
self._build_alternatives(msg, choices, default)
58
def _setup_mode(self):
59
"""Setup input mode (line-based, char-based) and echo-back.
61
Line-based input is used if the BRZ_TEXTUI_INPUT environment
62
variable is set to 'line-based', or if there is no controlling
65
if os.environ.get('BRZ_TEXTUI_INPUT') != 'line-based' and \
66
self.ui.stdin == sys.stdin and self.ui.stdin.isatty():
67
self.line_based = False
70
self.line_based = True
71
self.echo_back = not self.ui.stdin.isatty()
73
def _build_alternatives(self, msg, choices, default):
74
"""Parse choices string.
76
Setup final prompt and the lists of choices and associated
81
self.alternatives = {}
82
choices = choices.split('\n')
83
if default is not None and default not in range(0, len(choices)):
84
raise ValueError("invalid default index")
86
name = c.replace('&', '').lower()
87
choice = (name, index)
88
if name in self.alternatives:
89
raise ValueError("duplicated choice: %s" % name)
90
self.alternatives[name] = choice
91
shortcut = c.find('&')
92
if -1 != shortcut and (shortcut + 1) < len(c):
94
help += '[' + c[shortcut + 1] + ']'
95
help += c[(shortcut + 2):]
96
shortcut = c[shortcut + 1]
98
c = c.replace('&', '')
100
help = '[%s]%s' % (shortcut, c[1:])
101
shortcut = shortcut.lower()
102
if shortcut in self.alternatives:
103
raise ValueError("duplicated shortcut: %s" % shortcut)
104
self.alternatives[shortcut] = choice
105
# Add redirections for default.
107
self.alternatives[''] = choice
108
self.alternatives['\r'] = choice
109
help_list.append(help)
112
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
115
line = self.ui.stdin.readline()
121
char = osutils.getchar()
122
if char == chr(3): # INTR
123
raise KeyboardInterrupt
124
if char == chr(4): # EOF (^d, C-d)
126
return char.decode("ascii", "replace")
129
"""Keep asking the user until a valid choice is made.
132
getchoice = self._getline
134
getchoice = self._getchar
138
if 1 == iter or self.line_based:
139
self.ui.prompt(self.prompt)
143
self.ui.stderr.write(u'\n')
145
except KeyboardInterrupt:
146
self.ui.stderr.write(u'\n')
148
choice = choice.lower()
149
if choice not in self.alternatives:
150
# Not a valid choice, keep on asking.
152
name, index = self.alternatives[choice]
154
self.ui.stderr.write(name + u'\n')
158
opt_progress_bar = config.Option(
159
'progress_bar', help='Progress bar type.',
160
default_from_env=['BRZ_PROGRESS_BAR'], default=None,
164
class TextUIFactory(UIFactory):
165
"""A UI factory for Text user interfaces."""
167
def __init__(self, stdin, stdout, stderr):
168
"""Create a TextUIFactory."""
169
super(TextUIFactory, self).__init__()
173
# paints progress, network activity, etc
174
self._progress_view = self.make_progress_view()
176
def choose(self, msg, choices, default=None):
177
"""Prompt the user for a list of alternatives.
179
Support both line-based and char-based editing.
181
In line-based mode, both the shortcut and full choice name are valid
182
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
183
'YES ' are all valid input lines for choosing 'yes'.
185
An empty line, when in line-based mode, or pressing enter in char-based
186
mode will select the default choice (if any).
188
Choice is echoed back if:
189
- input is char-based; which means a controlling terminal is available,
190
and osutils.getchar is used
191
- input is line-based, and no controlling terminal is available
194
choose_ui = _ChooseUI(self, msg, choices, default)
195
return choose_ui.interact()
197
def be_quiet(self, state):
198
if state and not self._quiet:
200
UIFactory.be_quiet(self, state)
201
self._progress_view = self.make_progress_view()
203
def clear_term(self):
204
"""Prepare the terminal for output.
206
This will, clear any progress bars, and leave the cursor at the
207
leftmost position."""
208
# XXX: If this is preparing to write to stdout, but that's for example
209
# directed into a file rather than to the terminal, and the progress
210
# bar _is_ going to the terminal, we shouldn't need
211
# to clear it. We might need to separately check for the case of
212
self._progress_view.clear()
214
def get_integer(self, prompt):
217
line = self.stdin.readline()
223
def get_non_echoed_password(self):
224
isatty = getattr(self.stdin, 'isatty', None)
225
if isatty is not None and isatty():
226
# getpass() ensure the password is not echoed and other
227
# cross-platform niceties
228
password = getpass.getpass('')
230
# echo doesn't make sense without a terminal
231
password = self.stdin.readline()
235
if password[-1] == '\n':
236
password = password[:-1]
239
def get_password(self, prompt=u'', **kwargs):
240
"""Prompt the user for a password.
242
:param prompt: The prompt to present the user
243
:param kwargs: Arguments which will be expanded into the prompt.
244
This lets front ends display different things if
246
:return: The password string, return None if the user
247
canceled the request.
250
self.prompt(prompt, **kwargs)
251
# There's currently no way to say 'i decline to enter a password'
252
# as opposed to 'my password is empty' -- does it matter?
253
return self.get_non_echoed_password()
255
def get_username(self, prompt, **kwargs):
256
"""Prompt the user for a username.
258
:param prompt: The prompt to present the user
259
:param kwargs: Arguments which will be expanded into the prompt.
260
This lets front ends display different things if
262
:return: The username string, return None if the user
263
canceled the request.
266
self.prompt(prompt, **kwargs)
267
username = self.stdin.readline()
271
if username[-1] == '\n':
272
username = username[:-1]
275
def make_progress_view(self):
276
"""Construct and return a new ProgressView subclass for this UI.
278
# with --quiet, never any progress view
279
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
280
# user specifically requests either text or no progress bars, always
281
# do that. otherwise, guess based on $TERM and tty presence.
283
return NullProgressView()
284
pb_type = config.GlobalStack().get('progress_bar')
285
if pb_type == 'none': # Explicit requirement
286
return NullProgressView()
287
if (pb_type == 'text' # Explicit requirement
288
or progress._supports_progress(self.stderr)): # Guess
289
return TextProgressView(self.stderr)
290
# No explicit requirement and no successful guess
291
return NullProgressView()
293
def _make_output_stream_explicit(self, encoding, encoding_type):
294
return TextUIOutputStream(self, self.stdout, encoding, encoding_type)
297
"""Write an already-formatted message, clearing the progress bar if necessary."""
299
self.stdout.write(msg + '\n')
301
def prompt(self, prompt, **kwargs):
302
"""Emit prompt on the CLI.
304
:param kwargs: Dictionary of arguments to insert into the prompt,
305
to allow UIs to reformat the prompt.
307
if not isinstance(prompt, unicode):
308
raise ValueError("prompt %r not a unicode string" % prompt)
310
# See <https://launchpad.net/bugs/365891>
311
prompt = prompt % kwargs
314
self.stderr.write(prompt)
316
def report_transport_activity(self, transport, byte_count, direction):
317
"""Called by transports as they do IO.
319
This may update a progress bar, spinner, or similar display.
320
By default it does nothing.
322
self._progress_view.show_transport_activity(transport,
323
direction, byte_count)
325
def log_transport_activity(self, display=False):
326
"""See UIFactory.log_transport_activity()"""
327
log = getattr(self._progress_view, 'log_transport_activity', None)
331
def show_error(self, msg):
333
self.stderr.write("bzr: error: %s\n" % msg)
335
def show_message(self, msg):
338
def show_warning(self, msg):
340
self.stderr.write("bzr: warning: %s\n" % msg)
342
def _progress_updated(self, task):
343
"""A task has been updated and wants to be displayed.
345
if not self._task_stack:
346
warnings.warn("%r updated but no tasks are active" %
348
elif task != self._task_stack[-1]:
349
# We used to check it was the top task, but it's hard to always
350
# get this right and it's not necessarily useful: any actual
351
# problems will be evident in use
352
#warnings.warn("%r is not the top progress task %r" %
353
# (task, self._task_stack[-1]))
355
self._progress_view.show_progress(task)
357
def _progress_all_finished(self):
358
self._progress_view.clear()
360
def show_user_warning(self, warning_id, **message_args):
361
"""Show a text message to the user.
363
Explicitly not for warnings about bzr apis, deprecations or internals.
365
# eventually trace.warning should migrate here, to avoid logging and
366
# be easier to test; that has a lot of test fallout so for now just
367
# new code can call this
368
if warning_id not in self.suppressed_warnings:
369
warning = self.format_user_warning(warning_id, message_args)
370
self.stderr.write(warning + '\n')
373
def pad_to_width(line, width, encoding_hint='ascii'):
374
"""Truncate or pad unicode line to width.
376
This is best-effort for now, and strings containing control codes or
377
non-ascii text may be cut and padded incorrectly.
379
s = line.encode(encoding_hint, 'replace')
380
return (b'%-*.*s' % (width, width, s)).decode(encoding_hint)
383
class TextProgressView(object):
384
"""Display of progress bar and other information on a tty.
386
This shows one line of text, including possibly a network indicator,
387
spinner, progress bar, message, etc.
389
One instance of this is created and held by the UI, and fed updates when a
390
task wants to be painted.
392
Transports feed data to this through the ui_factory object.
394
The Progress views can comprise a tree with _parent_task pointers, but
395
this only prints the stack from the nominated current task up to the root.
398
def __init__(self, term_file, encoding=None, errors=None):
399
self._term_file = term_file
401
self._encoding = getattr(term_file, "encoding", None) or "ascii"
403
self._encoding = encoding
404
# true when there's output on the screen we may need to clear
405
self._have_output = False
406
self._last_transport_msg = ''
408
# time we last repainted the screen
409
self._last_repaint = 0
410
# time we last got information about transport activity
411
self._transport_update_time = 0
412
self._last_task = None
413
self._total_byte_count = 0
414
self._bytes_since_update = 0
415
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
416
self._first_byte_time = None
418
# force the progress bar to be off, as at the moment it doesn't
419
# correspond reliably to overall command progress
420
self.enable_bar = False
422
def _avail_width(self):
423
# we need one extra space for terminals that wrap on last char
424
w = osutils.terminal_width()
430
def _show_line(self, u):
431
width = self._avail_width()
432
if width is not None:
433
u = pad_to_width(u, width, encoding_hint=self._encoding)
434
self._term_file.write('\r' + u + '\r')
437
if self._have_output:
439
self._have_output = False
441
def _render_bar(self):
442
# return a string for the progress bar itself
443
if self.enable_bar and (
444
(self._last_task is None) or self._last_task.show_bar):
445
# If there's no task object, we show space for the bar anyhow.
446
# That's because most invocations of bzr will end showing progress
447
# at some point, though perhaps only after doing some initial IO.
448
# It looks better to draw the progress bar initially rather than
449
# to have what looks like an incomplete progress bar.
450
spin_str = r'/-\|'[self._spin_pos % 4]
453
if self._last_task is None:
454
completion_fraction = 0
457
completion_fraction = \
458
self._last_task._overall_completion_fraction() or 0
459
if (completion_fraction < self._fraction and 'progress' in
462
self._fraction = completion_fraction
463
markers = int(round(float(cols) * completion_fraction)) - 1
464
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
466
elif (self._last_task is None) or self._last_task.show_spinner:
467
# The last task wanted just a spinner, no bar
468
spin_str = r'/-\|'[self._spin_pos % 4]
470
return spin_str + ' '
474
def _format_task(self, task):
475
"""Format task-specific parts of progress bar.
477
:returns: (text_part, counter_part) both unicode strings.
479
if not task.show_count:
481
elif task.current_cnt is not None and task.total_cnt is not None:
482
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
483
elif task.current_cnt is not None:
484
s = ' %d' % (task.current_cnt)
487
# compose all the parent messages
490
while t._parent_task:
496
def _render_line(self):
497
bar_string = self._render_bar()
499
task_part, counter_part = self._format_task(self._last_task)
501
task_part = counter_part = ''
502
if self._last_task and not self._last_task.show_transport_activity:
505
trans = self._last_transport_msg
506
# the bar separates the transport activity from the message, so even
507
# if there's no bar or spinner, we must show something if both those
509
if (task_part or trans) and not bar_string:
511
# preferentially truncate the task message if we don't have enough
513
avail_width = self._avail_width()
514
if avail_width is not None:
515
# if terminal avail_width is unknown, don't truncate
516
current_len = len(bar_string) + len(trans) + len(task_part) + len(counter_part)
517
# GZ 2017-04-22: Should measure and truncate task_part properly
518
gap = current_len - avail_width
520
task_part = task_part[:-gap-2] + '..'
521
s = trans + bar_string + task_part + counter_part
522
if avail_width is not None:
523
if len(s) < avail_width:
524
s = s.ljust(avail_width)
525
elif len(s) > avail_width:
530
s = self._render_line()
532
self._have_output = True
534
def show_progress(self, task):
535
"""Called by the task object when it has changed.
537
:param task: The top task object; its parents are also included
540
must_update = task is not self._last_task
541
self._last_task = task
543
if (not must_update) and (now < self._last_repaint + task.update_latency):
545
if now > self._transport_update_time + 10:
546
# no recent activity; expire it
547
self._last_transport_msg = ''
548
self._last_repaint = now
551
def show_transport_activity(self, transport, direction, byte_count):
552
"""Called by transports via the ui_factory, as they do IO.
554
This may update a progress bar, spinner, or similar display.
555
By default it does nothing.
557
# XXX: there should be a transport activity model, and that too should
558
# be seen by the progress view, rather than being poked in here.
559
self._total_byte_count += byte_count
560
self._bytes_since_update += byte_count
561
if self._first_byte_time is None:
562
# Note that this isn't great, as technically it should be the time
563
# when the bytes started transferring, not when they completed.
564
# However, we usually start with a small request anyway.
565
self._first_byte_time = time.time()
566
if direction in self._bytes_by_direction:
567
self._bytes_by_direction[direction] += byte_count
569
self._bytes_by_direction['unknown'] += byte_count
570
if 'no_activity' in debug.debug_flags:
571
# Can be used as a workaround if
572
# <https://launchpad.net/bugs/321935> reappears and transport
573
# activity is cluttering other output. However, thanks to
574
# TextUIOutputStream this shouldn't be a problem any more.
577
if self._total_byte_count < 2000:
578
# a little resistance at first, so it doesn't stay stuck at 0
579
# while connecting...
581
if self._transport_update_time is None:
582
self._transport_update_time = now
583
elif now >= (self._transport_update_time + 0.5):
584
# guard against clock stepping backwards, and don't update too
586
rate = (self._bytes_since_update
587
/ (now - self._transport_update_time))
588
# using base-10 units (see HACKING.txt).
589
msg = ("%6dkB %5dkB/s " %
590
(self._total_byte_count / 1000, int(rate) / 1000,))
591
self._transport_update_time = now
592
self._last_repaint = now
593
self._bytes_since_update = 0
594
self._last_transport_msg = msg
597
def _format_bytes_by_direction(self):
598
if self._first_byte_time is None:
601
transfer_time = time.time() - self._first_byte_time
602
if transfer_time < 0.001:
603
transfer_time = 0.001
604
bps = self._total_byte_count / transfer_time
606
# using base-10 units (see HACKING.txt).
607
msg = ('Transferred: %.0fkB'
608
' (%.1fkB/s r:%.0fkB w:%.0fkB'
609
% (self._total_byte_count / 1000.,
611
self._bytes_by_direction['read'] / 1000.,
612
self._bytes_by_direction['write'] / 1000.,
614
if self._bytes_by_direction['unknown'] > 0:
615
msg += ' u:%.0fkB)' % (
616
self._bytes_by_direction['unknown'] / 1000.
622
def log_transport_activity(self, display=False):
623
msg = self._format_bytes_by_direction()
625
if display and self._total_byte_count > 0:
627
self._term_file.write(msg + '\n')
630
def _get_stream_encoding(stream):
631
encoding = config.GlobalStack().get('output_encoding')
633
encoding = getattr(stream, "encoding", None)
635
encoding = osutils.get_terminal_encoding(trace=True)
639
def _unwrap_stream(stream):
640
inner = getattr(stream, "buffer", None)
642
inner = getattr(stream, "stream", None)
646
def _wrap_in_stream(stream, encoding=None, errors='replace'):
648
encoding = _get_stream_encoding(stream)
649
encoded_stream = codecs.getreader(encoding)(stream, errors=errors)
650
encoded_stream.encoding = encoding
651
return encoded_stream
654
def _wrap_out_stream(stream, encoding=None, errors='replace'):
656
encoding = _get_stream_encoding(stream)
657
encoded_stream = codecs.getwriter(encoding)(stream, errors=errors)
658
encoded_stream.encoding = encoding
659
return encoded_stream
662
class TextUIOutputStream(object):
663
"""Decorates stream to interact better with progress and change encoding.
665
Before writing to the wrapped stream, progress is cleared. Callers must
666
ensure bulk output is terminated with a newline so progress won't overwrite
669
Additionally, the encoding and errors behaviour of the underlying stream
670
can be changed at this point. If errors is set to 'exact' raw bytes may be
671
written to the underlying stream.
674
def __init__(self, ui_factory, stream, encoding=None, errors='strict'):
675
self.ui_factory = ui_factory
676
# GZ 2017-05-21: Clean up semantics when callers are made saner.
677
inner = _unwrap_stream(stream)
678
self.raw_stream = None
679
if errors == "exact":
681
self.raw_stream = inner
683
self.wrapped_stream = stream
685
encoding = _get_stream_encoding(stream)
687
self.wrapped_stream = _wrap_out_stream(inner, encoding, errors)
689
encoding = self.wrapped_stream.encoding
690
self.encoding = encoding
693
def _write(self, to_write):
694
if isinstance(to_write, bytes):
696
to_write = to_write.decode(self.encoding, self.errors)
697
except UnicodeDecodeError:
698
self.raw_stream.write(to_write)
700
self.wrapped_stream.write(to_write)
703
self.ui_factory.clear_term()
704
self.wrapped_stream.flush()
706
def write(self, to_write):
707
self.ui_factory.clear_term()
708
self._write(to_write)
710
def writelines(self, lines):
711
self.ui_factory.clear_term()