#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Dumps CKAN instance data: metadata plus entire datasets. # # Copyright (C) 2019 Silvio Rhatto # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, # or any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Dependencies import asyncio import argparse import sys, os, json from urllib.parse import urlencode from tqdm import tqdm class DownloadMultiple: """Downloads multiple files simultaneously with error logging and fancy output""" def __init__(self, limit_rate, limit_concurrent = 20, progress = True, debug = False, wget = '/usr/bin/wget'): if not os.path.exists(wget): raise FileNotFoundError('Wget not found in path ' + wget + '; please install it first.') self.limit_rate = limit_rate self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent)) self.progress = progress self.debug = debug self.wget = wget self.globalstats = { 'exitstatus': {} } def ensuredir(self, dest): """Ensures that the destination folder exists""" if not os.path.exists(dest) and not os.path.isdir(dest): os.makedirs(dest, 0o755); elif os.path.exists(dest) and not os.path.isdir(dest): raise ValueError('File exists and is not a folder:' + dest) async def download_file(self, url, local_filename, semaphore): """Downloads a file. Using wget as it is more reliable Straight from https://docs.python.org/3/library/asyncio-subprocess.html """ async with semaphore: if self.debug: print('Downloading ' + url + '...') self.ensuredir(os.path.dirname(local_filename)); # Other opts: -q --show-progress -O cmd = self.wget + ' ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if stdout: output = open(local_filename + '.stdout', 'w') output.write(stdout.decode()) output.close() if self.debug: print(f'[stdout] {url} {stdout.decode()}') if stderr: output = open(local_filename + '.stderr', 'w') output.write(stderr.decode()) output.close() if self.debug: print(f'[stderr] {url} {stderr.decode()}') output = open(local_filename + '.returncode', 'w') output.write(str(proc.returncode) + '\n') output.close() if not str(proc.returncode) in self.globalstats['exitstatus']: self.globalstats['exitstatus'][str(proc.returncode)] = [] self.globalstats['exitstatus'][str(proc.returncode)].append(url); if not str(proc.returncode) in self.stats['exitstatus']: self.stats['exitstatus'][str(proc.returncode)] = [] self.stats['exitstatus'][str(proc.returncode)].append(url); if self.debug: print(f'[{cmd!r} exited with {proc.returncode}]') if hasattr(self.bar, 'update'): self.bar.update(1) return proc.returncode async def gather(self, filepairs): """Gather all files to be downloaded See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 """ jobs = [] for url, filename in filepairs: jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent))) await asyncio.gather(*jobs) def get(self, filepairs): self.stats = { 'exitstatus': {} } self.bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False loop = asyncio.get_event_loop() loop.set_debug(self.debug) loop.run_until_complete(self.gather(filepairs)) if self.debug: print(self.stats) return self.stats class CkanDumper: """Dumps CKAN data: metadata plus entire datasets""" def __init__(self, args): self.url = args.url[0] self.dest = args.dest[0] self.package_list = '/api/3/action/package_list' self.package_show = '/api/3/action/package_show?' self.group_list = '/api/3/action/group_list' self.group_show = '/api/3/action/group_show?' self.tag_list = '/api/3/action/tag_list' self.tag_show = '/api/3/action/tag_show?' if args.debug != None: self.debug = args.debug if args.progress != None: self.progress = args.progress if args.limit_rate != None: self.limit_rate = '--limit-rate=' + args.limit_rate else: self.limit_rate = '' if args.limit_concurrent != None: self.limit_concurrent = args.limit_concurrent else: self.limit_concurrent = '20' if args.wget == None: args.wget = '/usr/bin/wget' self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent, self.progress, self.debug, args.wget) def load_json(self, file): """Loads a file with contents serialized as JSON""" descriptor = open(file) data = json.load(descriptor) descriptor.close() return data def dump_stats(self, stats): """Dump download batch statistics""" if stats != None: print('Statistics (exit status / total downloads): ', end='') for status in stats['exitstatus']: print(status + ': ' + str(len(stats['exitstatus'][status])), end='; ') print('') def write_stats(self): """Write global statistics to file""" stats = open(self.dest + os.sep + 'ckandumper.stats.json', 'w') stats.write(json.dumps(self.download.globalstats, indent=2) + '\n') def process_stats(self, stats): """Process stats at each run""" self.dump_stats(stats) self.write_stats() def dump(self): """Downloads all content listed in a CKAN repository""" package_list = self.dest + os.sep + 'package_list.json' group_list = self.dest + os.sep + 'group_list.json' tag_list = self.dest + os.sep + 'tag_list.json' # # Groups # print(f'Downloading {self.url}{self.group_list}...') stats = self.download.get([[self.url + self.group_list, group_list]]) groups = self.load_json(group_list) group_downloads = [] print(f'Downloading each group info...') for group in groups['result']: group_folder = self.dest + os.sep + 'groups' + os.sep + group group_file = group_folder + os.sep + 'group.json' group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file]) stats = self.download.get(group_downloads) self.process_stats(stats) # # Tags # print(f'Downloading {self.url}{self.tag_list}...') stats = self.download.get([[self.url + self.tag_list, tag_list]]) tags = self.load_json(tag_list) tags_downloads = [] print(f'Downloading each tag info...') for tag in tags['result']: tag_folder = self.dest + os.sep + 'tags' + os.sep + tag tag_file = tag_folder + os.sep + 'tag.json' tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file]) stats = self.download.get(tags_downloads) self.process_stats(stats) # # Packages # print(f'Downloading {self.url}{self.package_list}...') stats = self.download.get([[self.url + self.package_list, package_list]]) packages = self.load_json(package_list) packages_downloads = [] print(f'Downloading each package info...') for package in packages['result']: package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file]) stats = self.download.get(packages_downloads) self.process_stats(stats) # # Package contents # for package in packages['result']: package_downloads = [] package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' contents = self.load_json(package_file) print(f'Downloading contents of package {package}...') for resource in contents['result']['resources']: #if resource['name'] != None: # name = resource['name'] #else # name = resource['id'] name = resource['id'] if resource['format'] != None: format = '.' + resource['format'].lower() else: format = '' resource_file = package_folder + os.sep + 'data' + os.sep + name + format package_downloads.append([resource['url'], resource_file]) stats = self.download.get(package_downloads) self.process_stats(stats) if __name__ == "__main__": # Parse CLI parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.') parser.add_argument('url', nargs='+', help='CKAN instance URL') parser.add_argument('dest', nargs='+', help='Destination folder') parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download") parser.add_argument("--limit-concurrent", help="Limit the total concurrent downloads") parser.add_argument("--wget", help="Path of custom wget implementation") parser.add_argument('--debug', dest='debug', action='store_true', help="Enable debug") parser.add_argument('--no-debug', dest='debug', action='store_false', help="Disable debug") parser.add_argument('--progress', dest='progress', action='store_true', help="Enable progress") parser.add_argument('--no-progress', dest='progress', action='store_false', help="Disable progress") parser.set_defaults(debug=False) parser.set_defaults(progress=True) args = parser.parse_args() # Dispatch try: ckan = CkanDumper(args) ckan.dump() except FileNotFoundError as e: print(e) exit(1)