#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Dumps CKAN instance data: metadata plus entire datasets.
#
# Copyright (C) 2019 Silvio Rhatto <rhatto@riseup.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# Dependencies
import time
import datetime
import random
import asyncio
import argparse
import sys, os, subprocess, json
from urllib.parse import urlencode
from hashlib import sha256
from tqdm import tqdm

class DownloadMultiple:
    """Downloads multiple files simultaneously with error logging and fancy output"""

    def __init__(self, limit_rate, limit_concurrent = 20, progress = True, debug = False, wget = '/usr/bin/wget'):
        # Check for wget
        wget_bin = wget.split(' ')[0]
        if '/' in wget_bin and not os.path.exists(wget_bin):
            raise FileNotFoundError('Wget not found in path ' + wget_bin + '; please install it first.')
        else:
            subprocess.check_call(wget_bin + ' --help', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)

        self.limit_rate       = limit_rate
        self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent))
        self.progress         = progress
        self.debug            = debug
        self.wget             = wget
        self.globalstats      =  { 'exitstatus': {} }

    def ensuredir(self, dest):
        """Ensures that the destination folder exists"""
        if not os.path.exists(dest) and not os.path.isdir(dest):
            os.makedirs(dest, 0o755);
        elif os.path.exists(dest) and not os.path.isdir(dest):
            raise ValueError('File exists and is not a folder:' + dest)

    async def download_file(self, url, local_filename, semaphore, progress_bar):
        """Downloads a file.

        Using wget as it is more reliable

        Straight from https://docs.python.org/3/library/asyncio-subprocess.html
        """

        async with semaphore:
            if self.debug:
                print('Downloading ' + url + '...')

            self.ensuredir(os.path.dirname(local_filename));

            # Other opts: -q --show-progress -O
            cmd  = self.wget + ' ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url
            proc = await asyncio.create_subprocess_shell(cmd,
                                                         stdout=asyncio.subprocess.PIPE,
                                                         stderr=asyncio.subprocess.PIPE)

            stdout, stderr = await proc.communicate()

            if stdout:
                output = open(local_filename + '.stdout', 'w')
                output.write(stdout.decode())
                output.close()

                if self.debug:
                    print(f'[stdout] {url} {stdout.decode()}')

            if stderr:
                output = open(local_filename + '.stderr', 'w')
                output.write(stderr.decode())
                output.close()

                if self.debug:
                    print(f'[stderr] {url} {stderr.decode()}')

            output = open(local_filename + '.returncode', 'w')
            output.write(str(proc.returncode) + '\n')
            output.close()

            output = open(local_filename + '.date', 'w')
            output.write(str(datetime.datetime.now()) + '\n')
            output.close()

            if os.path.exists(local_filename):
                # File might be too big, so we're not computing it's inside the script
                #content = open(local_filename, 'rb')
                #output  = open(local_filename + '.sha256', 'w')
                #output.write(sha256(content.read()).hexdigest() + '  ' + os.path.basename(local_filename) + '\n')
                #output.close()
                hasher  = 'cd ' + os.path.dirname(local_filename) +  '&& /usr/bin/sha256sum '
                hasher += os.path.basename(local_filename) + ' > ' + os.path.basename(local_filename) + '.sha256'
                hash    = await asyncio.create_subprocess_shell(hasher,
                                                                stdout=asyncio.subprocess.PIPE,
                                                                stderr=asyncio.subprocess.PIPE)

            if not str(proc.returncode) in self.globalstats['exitstatus']:
                self.globalstats['exitstatus'][str(proc.returncode)] = []

            self.globalstats['exitstatus'][str(proc.returncode)].append(url);

            if not str(proc.returncode) in self.stats['exitstatus']:
                self.stats['exitstatus'][str(proc.returncode)] = []

            self.stats['exitstatus'][str(proc.returncode)].append(url);

            if self.debug:
                print(f'[{cmd!r} exited with {proc.returncode}]')

            if hasattr(progress_bar, 'update'):
                progress_bar.update(1)

            return proc.returncode

    async def gather(self, filepairs):
        """Gather all files to be downloaded

        See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882
        """

        jobs         = []
        progress_bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False

        for url, filename in filepairs:
          jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent, progress_bar)))

        await asyncio.gather(*jobs)

        if hasattr(progress_bar, 'close'):
            progress_bar.close()

    def get(self, filepairs):
        self.stats =  { 'exitstatus': {} }
        loop       = asyncio.get_event_loop()

        loop.set_debug(self.debug)
        loop.run_until_complete(self.gather(filepairs))

        if self.debug:
            print(self.stats)

        return self.stats

class CkanDumper:
    """Dumps CKAN data: metadata plus entire datasets"""

    def __init__(self, args):
        self.url          = args.url[0]
        self.dest         = ''.join(args.dest)
        self.package_list = '/api/3/action/package_list'
        self.package_show = '/api/3/action/package_show?'
        self.group_list   = '/api/3/action/group_list'
        self.group_show   = '/api/3/action/group_show?'
        self.tag_list     = '/api/3/action/tag_list'
        self.tag_show     = '/api/3/action/tag_show?'

        if args.debug != None:
            self.debug = args.debug

        if args.progress != None:
            self.progress = args.progress

        if args.randomize != None:
            self.randomize = args.randomize

        if args.limit_rate != None:
            self.limit_rate = '--limit-rate=' + args.limit_rate
        else:
            self.limit_rate = ''

        if args.limit_concurrent != None:
            self.limit_concurrent = args.limit_concurrent
        else:
            self.limit_concurrent = '20'

        if args.wget == None:
            args.wget = '/usr/bin/wget'

        self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent, self.progress, self.debug, args.wget)

    def load_json(self, file):
        """Loads a file with contents serialized as JSON"""
        descriptor = open(file)
        data       = json.load(descriptor)

        descriptor.close()
        return data

    def dump_stats(self, stats):
        """Dump download batch statistics"""
        if stats != None:
            print('Statistics (exit status / total downloads): ', end='')

            for status in stats['exitstatus']:
                print(status + ': ' + str(len(stats['exitstatus'][status])), end='; ')

            print('')

    def write_stats(self):
        """Write global statistics to file"""
        stats = open(self.dest + os.sep + 'ckandumper.stats.json', 'w')
        stats.write(json.dumps(self.download.globalstats, indent=2) + '\n')

    def process_stats(self, stats):
        """Process stats at each run"""
        self.dump_stats(stats)
        self.write_stats()

    def dump(self):
        """Downloads all content listed in a CKAN repository"""
        package_list = self.dest + os.sep + 'package_list.json'
        group_list   = self.dest + os.sep + 'group_list.json'
        tag_list     = self.dest + os.sep + 'tag_list.json'

        print('Starting at ' + str(datetime.datetime.now()))

        #
        # Groups
        #
        print(f'Downloading {self.url}{self.group_list}...')
        stats = self.download.get([[self.url + self.group_list, group_list]])

        groups          = self.load_json(group_list)
        group_downloads = []

        print(f'Downloading each group info...')
        for group in groups['result']:
            group_folder = self.dest + os.sep + 'groups' + os.sep + group
            group_file   = group_folder + os.sep + 'group.json'

            group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file])

        stats = self.download.get(group_downloads)
        self.process_stats(stats)

        #
        # Tags
        #
        print(f'Downloading {self.url}{self.tag_list}...')
        stats = self.download.get([[self.url + self.tag_list, tag_list]])

        tags           = self.load_json(tag_list)
        tags_downloads = []

        print(f'Downloading each tag info...')
        for tag in tags['result']:
            tag_folder = self.dest  + os.sep + 'tags' + os.sep + tag
            tag_file   = tag_folder + os.sep + 'tag.json'

            tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file])

        stats = self.download.get(tags_downloads)
        self.process_stats(stats)

        #
        # Packages
        #
        print(f'Downloading {self.url}{self.package_list}...')
        stats = self.download.get([[self.url + self.package_list, package_list]])

        packages           = self.load_json(package_list)
        packages_downloads = []

        print(f'Downloading each package info...')

        for package in packages['result']:
            package_folder = self.dest      + os.sep + 'packages' + os.sep + package
            package_file   = package_folder + os.sep + 'package.json'

            packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file])

        stats = self.download.get(packages_downloads)
        self.process_stats(stats)

        print('Downloading contents of all packages...')
        package_downloads = []

        #
        # Package contents
        #
        for package in packages['result']:
            package_folder = self.dest      + os.sep + 'packages' + os.sep + package
            package_file   = package_folder + os.sep + 'package.json'
            contents       = self.load_json(package_file)

            for resource in contents['result']['resources']:
                #if resource['name'] != None:
                #  name = resource['name']
                #else
                #  name = resource['id']

                name = resource['id']
                if resource['format'] != None:
                    format = '.' + resource['format'].lower()
                else:
                    format = ''

                resource_file = package_folder + os.sep + 'data' + os.sep + name + format

                package_downloads.append([resource['url'], resource_file])

        if self.randomize == True:
            random.seed()
            random.shuffle(package_downloads)

        stats = self.download.get(package_downloads)
        self.process_stats(stats)

if __name__ == "__main__":
    # Parse CLI
    examples  = """ Examples:

      ckandumper --limit-concurrent=10 --limit-rate=100k --randomize https://open.canada.ca/data/en/ canada/
      ckandumper --limit-concurrent=10 --limit-rate=100k --randomize https://opendata.swiss/en/ switzerland/
      ckandumper --limit-concurrent=10 --wget="wget --no-check-certificate" --randomize http://dados.gov.br
    """
    parser    = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.', epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter,)
    parser.add_argument('url',                nargs='+',                                help='CKAN instance URL')
    parser.add_argument('dest',               nargs='+',                                help='Destination folder')
    parser.add_argument('--limit-rate',                                                 help='Limit the download speed to amount bytes per second, per download, shorthand for "--wget="wget --limit-rate"')
    parser.add_argument('--limit-concurrent',                                           help='Limit the total concurrent downloads')
    parser.add_argument('--wget',                                                       help='Custom wget invocation')
    parser.add_argument('--debug',            dest='debug',     action='store_true',    help='Enable debug')
    parser.add_argument('--no-debug',         dest='debug',     action='store_false',   help='Disable debug')
    parser.add_argument('--progress',         dest='progress',  action='store_true',    help='Enable progress')
    parser.add_argument('--no-progress',      dest='progress',  action='store_false',   help='Disable progress')
    parser.add_argument('--randomize',        dest='randomize', action='store_true',    help='Randomize the list of downloads to avoid consuming resources of the same remote endpoint')
    parser.add_argument('--no-randomize',     dest='randomize', action='store_false',   help='Do not randomize the list of downloads')
    parser.set_defaults(debug=False)
    parser.set_defaults(randomize=False)
    parser.set_defaults(progress=True)
    args = parser.parse_args()

    # Dispatch
    try:
        start_time = time.time()

        # Initialize our dumper
        ckan = CkanDumper(args)

        # Record date and invocation
        logs = ''.join(args.dest) + os.sep + 'logs' + os.sep
        ckan.download.ensuredir(logs)
        invocation = open(logs + 'ckandumper.args', 'a')
        invocation.write(str(datetime.datetime.now()) + '\t' + ' '.join(sys.argv) + '\n')
        invocation.close()

        # Download everything we can
        ckan.dump()

        # Calculate runtime
        end_time = time.time()
        interval = round(end_time - start_time)
        duration = str(datetime.timedelta(seconds=interval))

        # Record duration
        print(f'Done. Elapsed time: {duration}')
        elapsed = open(logs + 'ckandumper.duration', 'a')
        elapsed.write(str(start_time) + '\t' + str(end_time) + '\t' + duration)
        elapsed.close()
    except (FileNotFoundError, KeyboardInterrupt, subprocess.CalledProcessError) as e:
        print(e)
        exit(1)