#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Dumps CKAN instance data: metadata plus entire datasets. # # Copyright (C) 2019 Silvio Rhatto # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, # or any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Dependencies import asyncio import argparse import curses import sys, os, json from urllib.parse import urlencode class StatusLine: """Handle printing in a given status line""" def __init__(self): self.stdscr = curses.initscr() curses.noecho() curses.cbreak() def print(self, line, content): """Print content in a specific status line""" height, width = self.stdscr.getmaxyx() if line < height: self.stdscr.addstr(line, 0, ' ' * width) self.stdscr.addstr(line, 0, content) self.stdscr.refresh() def clear(self, line): self.print(line, ' ') def teardown(self): """Finish status lines""" curses.echo() curses.nocbreak() curses.endwin() class DownloadMultiple: """Downloads multiple files simultaneously with error logging and fancy output""" wget = '/usr/bin/wget' def __init__(self, limit_rate, limit_concurrent = 20): if not os.path.exists(self.wget): raise FileNotFoundError('Wget not found in your path; please install it first.') self.limit_rate = limit_rate self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent)) self.status = StatusLine() def ensuredir(self, dest): """Ensures that the destination folder exists""" if not os.path.exists(dest) and not os.path.isdir(dest): os.makedirs(dest, 0o755); elif os.path.exists(dest) and not os.path.isdir(dest): raise ValueError('File exists and is not a folder:' + dest) async def download_file(self, url, local_filename, semaphore): """Downloads a file. Using wget as it is more reliable Straight from https://docs.python.org/3/library/asyncio-subprocess.html """ async with semaphore: #print('Downloading ' + url + '...') self.ensuredir(os.path.dirname(local_filename)); # Other opts: -q --show-progress cmd = '/usr/bin/wget ' + self.limit_rate + ' -q --show-progress --progress=dot -c -O "' + local_filename + '" ' + url proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() #if stdout: # self.status.print(semaphore._value, f'[stdout] {stdout.decode()}') if stderr: self.status.print(semaphore._value, f'[stderr] {stderr.decode()} {url}') #self.status.print(semaphore._value, f'[{cmd!r} exited with {proc.returncode}]') self.status.clear(semaphore._value) async def gather(self, filepairs): """Gather all files to be downloaded See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 """ jobs = [] for url, filename in filepairs: jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent))) await asyncio.gather(*jobs) def get(self, filepairs): loop = asyncio.get_event_loop() #loop.set_debug(True) loop.run_until_complete(self.gather(filepairs)) class CkanDumper: """Dumps CKAN data: metadata plus entire datasets""" def __init__(self, args): self.url = args.url[0] self.dest = args.dest[0] self.package_list = '/api/3/action/package_list' self.package_show = '/api/3/action/package_show?' self.group_list = '/api/3/action/group_list' self.group_show = '/api/3/action/group_show?' self.tag_list = '/api/3/action/tag_list' self.tag_show = '/api/3/action/tag_show?' if args.limit_rate != None: self.limit_rate = '--limit-rate=' + args.limit_rate else: self.limit_rate = '' if args.limit_concurrent != None: self.limit_concurrent = args.limit_concurrent else: self.limit_concurrent = '20' self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent) def load_json(self, file): """Loads a file with contents serialized as JSON""" descriptor = open(file) data = json.load(descriptor) descriptor.close() return data def dump(self): """Downloads all content listed in a CKAN repository""" # Switch to dest folder #self.ensuredir(self.dest) #os.chdir(self.dest) package_list = self.dest + os.sep + 'package_list.json' group_list = self.dest + os.sep + 'group_list.json' tag_list = self.dest + os.sep + 'tag_list.json' # # Groups # self.download.get([[self.url + self.group_list, group_list]]) groups = self.load_json(group_list) group_downloads = [] for group in groups['result']: group_folder = self.dest + os.sep + 'groups' + os.sep + group group_file = group_folder + os.sep + 'group.json' #print("Downloading " + self.url + self.group_show + 'id=' + group + '...') #self.ensuredir(group_folder) group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file]) self.download.get(group_downloads) # # Tags # self.download.get([[self.url + self.tag_list, tag_list]]) tags = self.load_json(tag_list) tags_downloads = [] for tag in tags['result']: tag_folder = self.dest + os.sep + 'tags' + os.sep + tag tag_file = tag_folder + os.sep + 'tag.json' #print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...') #self.ensuredir(tag_folder) tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file]) self.download.get(tags_downloads) # # Packages # self.download.get([[self.url + self.package_list, package_list]]) packages = self.load_json(package_list) packages_downloads = [] for package in packages['result']: package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' #print("Downloading " + self.url + self.package_show + 'id=' + package + '...') #self.ensuredir(package_folder + os.sep + 'data') packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file]) self.download.get(packages_downloads) # # Package contents # for package in packages['result']: package_downloads = [] package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' contents = self.load_json(package_file) for resource in contents['result']['resources']: #if resource['name'] != None: # name = resource['name'] #else # name = resource['id'] name = resource['id'] if resource['format'] != None: format = '.' + resource['format'].lower() else: format = '' resource_file = package_folder + os.sep + 'data' + os.sep + name + format package_downloads.append([resource['url'], resource_file]) self.download.get(package_downloads) # Run only once during development #return if __name__ == "__main__": # Parse CLI parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.') parser.add_argument('url', nargs='+', help='CKAN instance URL') parser.add_argument('dest', nargs='+', help='Destination folder') parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download") parser.add_argument("--limit-concurrent", help="Limit the total concurrent downloads") args = parser.parse_args() # Dispatch ckan = CkanDumper(args) ckan.dump()