# Copyright (C) 2010 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Author: Mattias Eriksson

"""Implementation of Transport over gio.

Written by Mattias Eriksson <snaggen@acc.umu.se> based on the ftp transport.

It provides the gio+XXX:// protocols where XXX is any of the protocols
supported by gio.
"""

from io import BytesIO
import os
import random
import stat
import time
from urllib.parse import (
    urlparse,
    urlunparse,
    )

from .. import (
    config,
    errors,
    osutils,
    urlutils,
    debug,
    ui,
    )
from ..trace import mutter
from . import (
    FileStream,
    ConnectedTransport,
    _file_streams,
    )

from ..tests.test_server import TestServer

try:
    import glib
except ImportError as e:
    raise errors.DependencyNotPresent('glib', e)
try:
    import gio
except ImportError as e:
    raise errors.DependencyNotPresent('gio', e)


class GioLocalURLServer(TestServer):
    """A pretend server for local transports, using file:// urls.

    Of course no actual server is required to access the local filesystem, so
    this just exists to tell the test code how to get to it.
    """

    def start_server(self):
        pass

    def get_url(self):
        """See Transport.Server.get_url."""
        return "gio+" + urlutils.local_path_to_url('')


class GioFileStream(FileStream):
    """A file stream object returned by open_write_stream.

    This version uses GIO to perform writes.
    """

    def __init__(self, transport, relpath):
        FileStream.__init__(self, transport, relpath)
        self.gio_file = transport._get_GIO(relpath)
        self.stream = self.gio_file.create()

    def _close(self):
        self.stream.close()

    def write(self, bytes):
        try:
            # Using pump_string_file seems to make things crash
            osutils.pumpfile(BytesIO(bytes), self.stream)
        except gio.Error as e:
            # self.transport._translate_gio_error(e,self.relpath)
            raise errors.BzrError(str(e))


class GioStatResult(object):

    def __init__(self, f):
        info = f.query_info('standard::size,standard::type')
        self.st_size = info.get_size()
        type = info.get_file_type()
        if (type == gio.FILE_TYPE_REGULAR):
            self.st_mode = stat.S_IFREG
        elif type == gio.FILE_TYPE_DIRECTORY:
            self.st_mode = stat.S_IFDIR


class GioTransport(ConnectedTransport):
    """This is the transport agent for gio+XXX:// access."""

    def __init__(self, base, _from_transport=None):
        """Initialize the GIO transport and make sure the url is correct."""

        if not base.startswith('gio+'):
            raise ValueError(base)

        (scheme, netloc, path, params, query, fragment) = \
            urlparse(base[len('gio+'):], allow_fragments=False)
        if '@' in netloc:
            user, netloc = netloc.rsplit('@', 1)
        # Seems it is not possible to list supported backends for GIO
        # so a hardcoded list it is then.
        gio_backends = ['dav', 'file', 'ftp', 'obex', 'sftp', 'ssh', 'smb']
        if scheme not in gio_backends:
            raise urlutils.InvalidURL(base,
                                      extra="GIO support is only available for " +
                                      ', '.join(gio_backends))

        # Remove the username and password from the url we send to GIO
        # by rebuilding the url again.
        u = (scheme, netloc, path, '', '', '')
        self.url = urlunparse(u)

        # And finally initialize super
        super(GioTransport, self).__init__(base,
                                           _from_transport=_from_transport)

    def _relpath_to_url(self, relpath):
        full_url = urlutils.join(self.url, relpath)
        if isinstance(full_url, str):
            raise urlutils.InvalidURL(full_url)
        return full_url

    def _get_GIO(self, relpath):
        """Return the ftplib.GIO instance for this object."""
        # Ensures that a connection is established
        connection = self._get_connection()
        if connection is None:
            # First connection ever
            connection, credentials = self._create_connection()
            self._set_connection(connection, credentials)
        fileurl = self._relpath_to_url(relpath)
        file = gio.File(fileurl)
        return file

    def _auth_cb(self, op, message, default_user, default_domain, flags):
        # really use breezy.auth get_password for this
        # or possibly better gnome-keyring?
        auth = config.AuthenticationConfig()
        parsed_url = urlutils.URL.from_string(self.url)
        user = None
        if (flags & gio.ASK_PASSWORD_NEED_USERNAME and
                flags & gio.ASK_PASSWORD_NEED_DOMAIN):
            prompt = (u'%s' % (parsed_url.scheme.upper(),) +
                      u' %(host)s DOMAIN\\username')
            user_and_domain = auth.get_user(parsed_url.scheme,
                                            parsed_url.host, port=parsed_url.port, ask=True,
                                            prompt=prompt)
            (domain, user) = user_and_domain.split('\\', 1)
            op.set_username(user)
            op.set_domain(domain)
        elif flags & gio.ASK_PASSWORD_NEED_USERNAME:
            user = auth.get_user(parsed_url.scheme, parsed_url.host,
                                 port=parsed_url.port, ask=True)
            op.set_username(user)
        elif flags & gio.ASK_PASSWORD_NEED_DOMAIN:
            # Don't know how common this case is, but anyway
            # a DOMAIN and a username prompt should be the
            # same so I will missuse the ui_factory get_username
            # a little bit here.
            prompt = (u'%s' % (parsed_url.scheme.upper(),) +
                      u' %(host)s DOMAIN')
            domain = ui.ui_factory.get_username(prompt=prompt)
            op.set_domain(domain)

        if flags & gio.ASK_PASSWORD_NEED_PASSWORD:
            if user is None:
                user = op.get_username()
            password = auth.get_password(parsed_url.scheme, parsed_url.host,
                                         user, port=parsed_url.port)
            op.set_password(password)
        op.reply(gio.MOUNT_OPERATION_HANDLED)

    def _mount_done_cb(self, obj, res):
        try:
            obj.mount_enclosing_volume_finish(res)
            self.loop.quit()
        except gio.Error as e:
            self.loop.quit()
            raise errors.BzrError(
                "Failed to mount the given location: " + str(e))

    def _create_connection(self, credentials=None):
        if credentials is None:
            user, password = self._parsed_url.user, self._parsed_url.password
        else:
            user, password = credentials

        try:
            connection = gio.File(self.url)
            mount = None
            try:
                mount = connection.find_enclosing_mount()
            except gio.Error as e:
                if (e.code == gio.ERROR_NOT_MOUNTED):
                    self.loop = glib.MainLoop()
                    ui.ui_factory.show_message('Mounting %s using GIO' %
                                               self.url)
                    op = gio.MountOperation()
                    if user:
                        op.set_username(user)
                    if password:
                        op.set_password(password)
                    op.connect('ask-password', self._auth_cb)
                    m = connection.mount_enclosing_volume(op,
                                                          self._mount_done_cb)
                    self.loop.run()
        except gio.Error as e:
            raise errors.TransportError(msg="Error setting up connection:"
                                        " %s" % str(e), orig_error=e)
        return connection, (user, password)

    def disconnect(self):
        # FIXME: Nothing seems to be necessary here, which sounds a bit strange
        # -- vila 20100601
        pass

    def _reconnect(self):
        # FIXME: This doesn't seem to be used -- vila 20100601
        """Create a new connection with the previously used credentials"""
        credentials = self._get_credentials()
        connection, credentials = self._create_connection(credentials)
        self._set_connection(connection, credentials)

    def _remote_path(self, relpath):
        return self._parsed_url.clone(relpath).path

    def has(self, relpath):
        """Does the target location exist?"""
        try:
            if 'gio' in debug.debug_flags:
                mutter('GIO has check: %s' % relpath)
            f = self._get_GIO(relpath)
            st = GioStatResult(f)
            if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode):
                return True
            return False
        except gio.Error as e:
            if e.code == gio.ERROR_NOT_FOUND:
                return False
            else:
                self._translate_gio_error(e, relpath)

    def get(self, relpath, retries=0):
        """Get the file at the given relative path.

        :param relpath: The relative path to the file
        :param retries: Number of retries after temporary failures so far
                        for this operation.

        We're meant to return a file-like object which bzr will
        then read from. For now we do this via the magic of BytesIO
        """
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO get: %s" % relpath)
            f = self._get_GIO(relpath)
            fin = f.read()
            buf = fin.read()
            fin.close()
            return BytesIO(buf)
        except gio.Error as e:
            # If we get a not mounted here it might mean
            # that a bad path has been entered (or that mount failed)
            if (e.code == gio.ERROR_NOT_MOUNTED):
                raise errors.PathError(relpath,
                                       extra='Failed to get file, make sure the path is correct. '
                                       + str(e))
            else:
                self._translate_gio_error(e, relpath)

    def put_file(self, relpath, fp, mode=None):
        """Copy the file-like object into the location.

        :param relpath: Location to put the contents, relative to base.
        :param fp:       File-like or string object.
        """
        if 'gio' in debug.debug_flags:
            mutter("GIO put_file %s" % relpath)
        tmppath = '%s.tmp.%.9f.%d.%d' % (relpath, time.time(),
                                         os.getpid(), random.randint(0, 0x7FFFFFFF))
        f = None
        fout = None
        try:
            closed = True
            try:
                f = self._get_GIO(tmppath)
                fout = f.create()
                closed = False
                length = self._pump(fp, fout)
                fout.close()
                closed = True
                self.stat(tmppath)
                dest = self._get_GIO(relpath)
                f.move(dest, flags=gio.FILE_COPY_OVERWRITE)
                f = None
                if mode is not None:
                    self._setmode(relpath, mode)
                return length
            except gio.Error as e:
                self._translate_gio_error(e, relpath)
        finally:
            if not closed and fout is not None:
                fout.close()
            if f is not None and f.query_exists():
                f.delete()

    def mkdir(self, relpath, mode=None):
        """Create a directory at the given path."""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO mkdir: %s" % relpath)
            f = self._get_GIO(relpath)
            f.make_directory()
            self._setmode(relpath, mode)
        except gio.Error as e:
            self._translate_gio_error(e, relpath)

    def open_write_stream(self, relpath, mode=None):
        """See Transport.open_write_stream."""
        if 'gio' in debug.debug_flags:
            mutter("GIO open_write_stream %s" % relpath)
        if mode is not None:
            self._setmode(relpath, mode)
        result = GioFileStream(self, relpath)
        _file_streams[self.abspath(relpath)] = result
        return result

    def recommended_page_size(self):
        """See Transport.recommended_page_size().

        For FTP we suggest a large page size to reduce the overhead
        introduced by latency.
        """
        if 'gio' in debug.debug_flags:
            mutter("GIO recommended_page")
        return 64 * 1024

    def rmdir(self, relpath):
        """Delete the directory at rel_path"""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO rmdir %s" % relpath)
            st = self.stat(relpath)
            if stat.S_ISDIR(st.st_mode):
                f = self._get_GIO(relpath)
                f.delete()
            else:
                raise errors.NotADirectory(relpath)
        except gio.Error as e:
            self._translate_gio_error(e, relpath)
        except errors.NotADirectory as e:
            # just pass it forward
            raise e
        except Exception as e:
            mutter('failed to rmdir %s: %s' % (relpath, e))
            raise errors.PathError(relpath)

    def append_file(self, relpath, file, mode=None):
        """Append the text in the file-like object into the final
        location.
        """
        # GIO append_to seems not to append but to truncate
        # Work around this.
        if 'gio' in debug.debug_flags:
            mutter("GIO append_file: %s" % relpath)
        tmppath = '%s.tmp.%.9f.%d.%d' % (relpath, time.time(),
                                         os.getpid(), random.randint(0, 0x7FFFFFFF))
        try:
            result = 0
            fo = self._get_GIO(tmppath)
            fi = self._get_GIO(relpath)
            fout = fo.create()
            try:
                info = GioStatResult(fi)
                result = info.st_size
                fin = fi.read()
                self._pump(fin, fout)
                fin.close()
            # This separate except is to catch and ignore the
            # gio.ERROR_NOT_FOUND for the already existing file.
            # It is valid to open a non-existing file for append.
            # This is caused by the broken gio append_to...
            except gio.Error as e:
                if e.code != gio.ERROR_NOT_FOUND:
                    self._translate_gio_error(e, relpath)
            length = self._pump(file, fout)
            fout.close()
            info = GioStatResult(fo)
            if info.st_size != result + length:
                raise errors.BzrError("Failed to append size after "
                                      "(%d) is not original (%d) + written (%d) total (%d)" %
                                      (info.st_size, result, length, result + length))
            fo.move(fi, flags=gio.FILE_COPY_OVERWRITE)
            return result
        except gio.Error as e:
            self._translate_gio_error(e, relpath)

    def _setmode(self, relpath, mode):
        """Set permissions on a path.

        Only set permissions on Unix systems
        """
        if 'gio' in debug.debug_flags:
            mutter("GIO _setmode %s" % relpath)
        if mode:
            try:
                f = self._get_GIO(relpath)
                f.set_attribute_uint32(gio.FILE_ATTRIBUTE_UNIX_MODE, mode)
            except gio.Error as e:
                if e.code == gio.ERROR_NOT_SUPPORTED:
                    # Command probably not available on this server
                    mutter("GIO Could not set permissions to %s on %s. %s",
                           oct(mode), self._remote_path(relpath), str(e))
                else:
                    self._translate_gio_error(e, relpath)

    def rename(self, rel_from, rel_to):
        """Rename without special overwriting"""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO move (rename): %s => %s", rel_from, rel_to)
            f = self._get_GIO(rel_from)
            t = self._get_GIO(rel_to)
            f.move(t)
        except gio.Error as e:
            self._translate_gio_error(e, rel_from)

    def move(self, rel_from, rel_to):
        """Move the item at rel_from to the location at rel_to"""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO move: %s => %s", rel_from, rel_to)
            f = self._get_GIO(rel_from)
            t = self._get_GIO(rel_to)
            f.move(t, flags=gio.FILE_COPY_OVERWRITE)
        except gio.Error as e:
            self._translate_gio_error(e, relfrom)

    def delete(self, relpath):
        """Delete the item at relpath"""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO delete: %s", relpath)
            f = self._get_GIO(relpath)
            f.delete()
        except gio.Error as e:
            self._translate_gio_error(e, relpath)

    def external_url(self):
        """See breezy.transport.Transport.external_url."""
        if 'gio' in debug.debug_flags:
            mutter("GIO external_url", self.base)
        # GIO external url
        return self.base

    def listable(self):
        """See Transport.listable."""
        if 'gio' in debug.debug_flags:
            mutter("GIO listable")
        return True

    def list_dir(self, relpath):
        """See Transport.list_dir."""
        if 'gio' in debug.debug_flags:
            mutter("GIO list_dir")
        try:
            entries = []
            f = self._get_GIO(relpath)
            children = f.enumerate_children(gio.FILE_ATTRIBUTE_STANDARD_NAME)
            for child in children:
                entries.append(urlutils.escape(child.get_name()))
            return entries
        except gio.Error as e:
            self._translate_gio_error(e, relpath)

    def iter_files_recursive(self):
        """See Transport.iter_files_recursive.

        This is cargo-culted from the SFTP transport"""
        if 'gio' in debug.debug_flags:
            mutter("GIO iter_files_recursive")
        queue = list(self.list_dir("."))
        while queue:
            relpath = queue.pop(0)
            st = self.stat(relpath)
            if stat.S_ISDIR(st.st_mode):
                for i, basename in enumerate(self.list_dir(relpath)):
                    queue.insert(i, relpath + "/" + basename)
            else:
                yield relpath

    def stat(self, relpath):
        """Return the stat information for a file."""
        try:
            if 'gio' in debug.debug_flags:
                mutter("GIO stat: %s", relpath)
            f = self._get_GIO(relpath)
            return GioStatResult(f)
        except gio.Error as e:
            self._translate_gio_error(e, relpath, extra='error w/ stat')

    def lock_read(self, relpath):
        """Lock the given file for shared (read) access.
        :return: A lock object, which should be passed to Transport.unlock()
        """
        if 'gio' in debug.debug_flags:
            mutter("GIO lock_read", relpath)

        class BogusLock(object):
            # The old RemoteBranch ignore lock for reading, so we will
            # continue that tradition and return a bogus lock object.

            def __init__(self, path):
                self.path = path

            def unlock(self):
                pass

        return BogusLock(relpath)

    def lock_write(self, relpath):
        """Lock the given file for exclusive (write) access.
        WARNING: many transports do not support this, so trying avoid using it

        :return: A lock object, whichshould be passed to Transport.unlock()
        """
        if 'gio' in debug.debug_flags:
            mutter("GIO lock_write", relpath)
        return self.lock_read(relpath)

    def _translate_gio_error(self, err, path, extra=None):
        if 'gio' in debug.debug_flags:
            mutter("GIO Error: %s %s" % (str(err), path))
        if extra is None:
            extra = str(err)
        if err.code == gio.ERROR_NOT_FOUND:
            raise errors.NoSuchFile(path, extra=extra)
        elif err.code == gio.ERROR_EXISTS:
            raise errors.FileExists(path, extra=extra)
        elif err.code == gio.ERROR_NOT_DIRECTORY:
            raise errors.NotADirectory(path, extra=extra)
        elif err.code == gio.ERROR_NOT_EMPTY:
            raise errors.DirectoryNotEmpty(path, extra=extra)
        elif err.code == gio.ERROR_BUSY:
            raise errors.ResourceBusy(path, extra=extra)
        elif err.code == gio.ERROR_PERMISSION_DENIED:
            raise errors.PermissionDenied(path, extra=extra)
        elif err.code == gio.ERROR_HOST_NOT_FOUND:
            raise errors.PathError(path, extra=extra)
        elif err.code == gio.ERROR_IS_DIRECTORY:
            raise errors.PathError(path, extra=extra)
        else:
            mutter('unable to understand error for path: %s: %s', path, err)
            raise errors.PathError(path,
                                   extra="Unhandled gio error: " + str(err))


def get_test_permutations():
    """Return the permutations to be used in testing."""
    from breezy.tests import test_server
    return [(GioTransport, GioLocalURLServer)]
