#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Dumps CKAN instance data: metadata plus entire datasets. # # Copyright (C) 2019 Silvio Rhatto # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, # or any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Dependencies import time import datetime import random import asyncio import argparse import sys, os, subprocess, json from urllib.parse import urlencode from hashlib import sha256 from tqdm import tqdm class DownloadMultiple: """Downloads multiple files simultaneously with error logging and fancy output""" def __init__(self, limit_rate, limit_concurrent = 20, progress = True, debug = False, wget = '/usr/bin/wget'): # Check for wget wget_bin = wget.split(' ')[0] if '/' in wget_bin and not os.path.exists(wget_bin): raise FileNotFoundError('Wget not found in path ' + wget_bin + '; please install it first.') else: subprocess.check_call(wget_bin + ' --help', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) self.limit_rate = limit_rate self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent)) self.progress = progress self.debug = debug self.wget = wget self.globalstats = { 'exitstatus': {} } def ensuredir(self, dest): """Ensures that the destination folder exists""" if not os.path.exists(dest) and not os.path.isdir(dest): os.makedirs(dest, 0o755); elif os.path.exists(dest) and not os.path.isdir(dest): raise ValueError('File exists and is not a folder:' + dest) async def download_file(self, url, local_filename, semaphore, progress_bar): """Downloads a file. Using wget as it is more reliable Straight from https://docs.python.org/3/library/asyncio-subprocess.html """ async with semaphore: if self.debug: print('Downloading ' + url + '...') self.ensuredir(os.path.dirname(local_filename)); # Other opts: -q --show-progress -O cmd = self.wget + ' ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if stdout: output = open(local_filename + '.stdout', 'w') output.write(stdout.decode()) output.close() if self.debug: print(f'[stdout] {url} {stdout.decode()}') if stderr: output = open(local_filename + '.stderr', 'w') output.write(stderr.decode()) output.close() if self.debug: print(f'[stderr] {url} {stderr.decode()}') output = open(local_filename + '.returncode', 'w') output.write(str(proc.returncode) + '\n') output.close() output = open(local_filename + '.date', 'w') output.write(str(datetime.datetime.now()) + '\n') output.close() if os.path.exists(local_filename): # File might be too big, so we're not computing it's inside the script #content = open(local_filename, 'rb') #output = open(local_filename + '.sha256', 'w') #output.write(sha256(content.read()).hexdigest() + ' ' + os.path.basename(local_filename) + '\n') #output.close() hasher = 'cd ' + os.path.dirname(local_filename) + '&& /usr/bin/sha256sum ' hasher += os.path.basename(local_filename) + ' > ' + os.path.basename(local_filename) + '.sha256' hash = await asyncio.create_subprocess_shell(hasher, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) if not str(proc.returncode) in self.globalstats['exitstatus']: self.globalstats['exitstatus'][str(proc.returncode)] = [] self.globalstats['exitstatus'][str(proc.returncode)].append(url); if not str(proc.returncode) in self.stats['exitstatus']: self.stats['exitstatus'][str(proc.returncode)] = [] self.stats['exitstatus'][str(proc.returncode)].append(url); if self.debug: print(f'[{cmd!r} exited with {proc.returncode}]') if hasattr(progress_bar, 'update'): progress_bar.update(1) return proc.returncode async def gather(self, filepairs): """Gather all files to be downloaded See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 """ jobs = [] progress_bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False for url, filename in filepairs: jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent, progress_bar))) await asyncio.gather(*jobs) if hasattr(progress_bar, 'close'): progress_bar.close() def get(self, filepairs): self.stats = { 'exitstatus': {} } loop = asyncio.get_event_loop() loop.set_debug(self.debug) loop.run_until_complete(self.gather(filepairs)) if self.debug: print(self.stats) return self.stats class CkanDumper: """Dumps CKAN data: metadata plus entire datasets""" def __init__(self, args): self.url = args.url[0] self.dest = args.dest[0] self.package_list = '/api/3/action/package_list' self.package_show = '/api/3/action/package_show?' self.group_list = '/api/3/action/group_list' self.group_show = '/api/3/action/group_show?' self.tag_list = '/api/3/action/tag_list' self.tag_show = '/api/3/action/tag_show?' if args.debug != None: self.debug = args.debug if args.progress != None: self.progress = args.progress if args.randomize != None: self.randomize = args.randomize if args.limit_rate != None: self.limit_rate = '--limit-rate=' + args.limit_rate else: self.limit_rate = '' if args.limit_concurrent != None: self.limit_concurrent = args.limit_concurrent else: self.limit_concurrent = '20' if args.wget == None: args.wget = '/usr/bin/wget' self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent, self.progress, self.debug, args.wget) def load_json(self, file): """Loads a file with contents serialized as JSON""" descriptor = open(file) data = json.load(descriptor) descriptor.close() return data def dump_stats(self, stats): """Dump download batch statistics""" if stats != None: print('Statistics (exit status / total downloads): ', end='') for status in stats['exitstatus']: print(status + ': ' + str(len(stats['exitstatus'][status])), end='; ') print('') def write_stats(self): """Write global statistics to file""" stats = open(self.dest + os.sep + 'ckandumper.stats.json', 'w') stats.write(json.dumps(self.download.globalstats, indent=2) + '\n') def process_stats(self, stats): """Process stats at each run""" self.dump_stats(stats) self.write_stats() def dump(self): """Downloads all content listed in a CKAN repository""" package_list = self.dest + os.sep + 'package_list.json' group_list = self.dest + os.sep + 'group_list.json' tag_list = self.dest + os.sep + 'tag_list.json' print('Starting at ' + str(datetime.datetime.now())) # # Groups # print(f'Downloading {self.url}{self.group_list}...') stats = self.download.get([[self.url + self.group_list, group_list]]) groups = self.load_json(group_list) group_downloads = [] print(f'Downloading each group info...') for group in groups['result']: group_folder = self.dest + os.sep + 'groups' + os.sep + group group_file = group_folder + os.sep + 'group.json' group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file]) stats = self.download.get(group_downloads) self.process_stats(stats) # # Tags # print(f'Downloading {self.url}{self.tag_list}...') stats = self.download.get([[self.url + self.tag_list, tag_list]]) tags = self.load_json(tag_list) tags_downloads = [] print(f'Downloading each tag info...') for tag in tags['result']: tag_folder = self.dest + os.sep + 'tags' + os.sep + tag tag_file = tag_folder + os.sep + 'tag.json' tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file]) stats = self.download.get(tags_downloads) self.process_stats(stats) # # Packages # print(f'Downloading {self.url}{self.package_list}...') stats = self.download.get([[self.url + self.package_list, package_list]]) packages = self.load_json(package_list) packages_downloads = [] print(f'Downloading each package info...') for package in packages['result']: package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file]) stats = self.download.get(packages_downloads) self.process_stats(stats) print('Downloading contents of all packages...') package_downloads = [] # # Package contents # for package in packages['result']: package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' contents = self.load_json(package_file) for resource in contents['result']['resources']: #if resource['name'] != None: # name = resource['name'] #else # name = resource['id'] name = resource['id'] if resource['format'] != None: format = '.' + resource['format'].lower() else: format = '' resource_file = package_folder + os.sep + 'data' + os.sep + name + format package_downloads.append([resource['url'], resource_file]) if self.randomize == True: random.shuffle(package_downloads) stats = self.download.get(package_downloads) self.process_stats(stats) if __name__ == "__main__": # Parse CLI examples = """ Examples: ckandumper --limit-concurrent=10 --limit-rate=100k --randomize https://open.canada.ca/data/en/ canada/ ckandumper --limit-concurrent=10 --limit-rate=100k --randomize https://opendata.swiss/en/ switzerland/ ckandumper --limit-concurrent=10 --wget="wget --no-check-certificate" --randomize http://dados.gov.br """ parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.', epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter,) parser.add_argument('url', nargs='+', help='CKAN instance URL') parser.add_argument('dest', nargs='+', help='Destination folder') parser.add_argument('--limit-rate', help='Limit the download speed to amount bytes per second, per download, shorthand for "--wget="wget --limit-rate"') parser.add_argument('--limit-concurrent', help='Limit the total concurrent downloads') parser.add_argument('--wget', help='Custom wget invocation') parser.add_argument('--debug', dest='debug', action='store_true', help='Enable debug') parser.add_argument('--no-debug', dest='debug', action='store_false', help='Disable debug') parser.add_argument('--progress', dest='progress', action='store_true', help='Enable progress') parser.add_argument('--no-progress', dest='progress', action='store_false', help='Disable progress') parser.add_argument('--randomize', dest='randomize', action='store_true', help='Randomize the list of downloads to avoid consuming resources of the same remote endpoint') parser.add_argument('--no-randomize', dest='randomize', action='store_false', help='Do not randomize the list of downloads') parser.set_defaults(debug=False) parser.set_defaults(randomize=False) parser.set_defaults(progress=True) args = parser.parse_args() # Dispatch try: start_time = time.time() ckan = CkanDumper(args) ckan.dump() interval = round(time.time() - start_time) duration = str(datetime.timedelta(seconds=interval)) print(f'Done. Elapsed time: {duration}') except (FileNotFoundError, KeyboardInterrupt, subprocess.CalledProcessError) as e: print(e) exit(1)