From 600c3dd67fc3be1e6f2cd3cf68189e7e0a276b46 Mon Sep 17 00:00:00 2001 From: Silvio Rhatto Date: Wed, 15 May 2019 15:06:03 -0300 Subject: Initial asyncio version --- ckandumper | 164 +++++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 127 insertions(+), 37 deletions(-) (limited to 'ckandumper') diff --git a/ckandumper b/ckandumper index d70f35f..d447899 100755 --- a/ckandumper +++ b/ckandumper @@ -19,10 +19,81 @@ # along with this program. If not, see . # Dependencies +import asyncio import argparse -import sys, os, subprocess, pycurl, json +import sys, os, subprocess, json from urllib.parse import urlencode +class DownloadMultiple: + """Downloads multiple files simultaneously with error logging and fancy output""" + + wget = '/usr/bin/wget' + + def __init__(self, limit_rate, limit_concurrent): + self.limit_rate = limit_rate + + if args.limit_concurrent != None: + self.limit_concurrent = asyncio.Semaphore(limit_concurrent) + else: + self.limit_concurrent = asyncio.Semaphore(20) + + if not os.path.exists(self.wget): + raise FileNotFoundError('Wget not found in your path; please install it first.') + + def ensuredir(self, dest): + """Ensures that the destination folder exists""" + if not os.path.exists(dest) and not os.path.isdir(dest): + os.makedirs(dest, 0o755); + elif os.path.exists(dest) and not os.path.isdir(dest): + raise ValueError('File exists and is not a folder:' + dest) + + async def download_file(self, url, local_filename, semaphore): + """Downloads a file. + + Using wget as it is more reliable + + Straight from https://docs.python.org/3/library/asyncio-subprocess.html + """ + + async with semaphore: + print('Downloading ' + url + '...') + self.ensuredir(os.path.dirname(local_filename)); + + # Other opts: -q --show-progress + cmd = '/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url + proc = subprocess.call(cmd, shell=True) + proc = await asyncio.create_subprocess_shell(cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + + stdout, stderr = await proc.communicate() + + print(f'[{cmd!r} exited with {proc.returncode}]') + + if stdout: + print(f'[stdout]\n{stdout.decode()}') + + if stderr: + print(f'[stderr]\n{stderr.decode()}') + + async def gather(self, filepairs): + """Gather all files to be downloaded + + See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 + """ + + jobs = [] + + for url, filename in filepairs: + jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent))) + + await asyncio.gather(*jobs) + + def get(self, filepairs): + loop = asyncio.get_event_loop() + loop.set_debug(True) + loop.run_until_complete(self.gather(filepairs)) + class CkanDumper: """Dumps CKAN data: metadata plus entire datasets""" @@ -38,32 +109,28 @@ class CkanDumper: if args.limit_rate != None: self.limit_rate = '--limit-rate=' + args.limit_rate + else: + self.limit_rate = '' - def download(self, url, local_filename): - """Downloads a file. - - Using wget as it is more reliable - """ - subprocess.call('/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url, shell=True) + if args.limit_concurrent != None: + self.limit_concurrent = args.limit_concurrent + else: + self.limit_concurrent = '20' - def ensuredir(self, dest): - """Ensures that the destination folder exists""" - if not os.path.exists(dest) and not os.path.isdir(dest): - os.makedirs(dest, 0o755); - elif os.path.exists(dest) and not os.path.isdir(dest): - raise ValueError('File exists and is not a folder:' + dest) + self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent) def load_json(self, file): """Loads a file with contents serialized as JSON""" descriptor = open(file) data = json.load(descriptor) - file.close() + + descriptor.close() + return data def dump(self): """Downloads all content listed in a CKAN repository""" - self.ensuredir(self.dest) - - # Move to dest folder + # Switch to dest folder + #self.ensuredir(self.dest) #os.chdir(self.dest) package_list = self.dest + os.sep + 'package_list.json' @@ -73,45 +140,65 @@ class CkanDumper: # # Groups # - self.download(self.url + self.group_list, group_list) - groups = self.load_json(group_list) + self.download.get([[self.url + self.group_list, group_list]]) + + groups = self.load_json(group_list) + group_downloads = [] for group in groups['result']: group_folder = self.dest + os.sep + 'groups' + os.sep + group group_file = group_folder + os.sep + 'group.json' - self.ensuredir(group_folder) - print("Downloading " + self.url + self.group_show + 'id=' + group + '...') - self.download(self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file) + #print("Downloading " + self.url + self.group_show + 'id=' + group + '...') + #self.ensuredir(group_folder) + group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file]) + + self.download.get(group_downloads) + # # Tags # - self.download(self.url + self.tag_list, tag_list) - tags = self.load_json(tag_list) + self.download.get([[self.url + self.tag_list, tag_list]]) + + tags = self.load_json(tag_list) + tags_downloads = [] for tag in tags['result']: tag_folder = self.dest + os.sep + 'tags' + os.sep + tag tag_file = tag_folder + os.sep + 'tag.json' - self.ensuredir(tag_folder) - print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...') - self.download(self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file) + #print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...') + #self.ensuredir(tag_folder) + tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file]) + + self.download.get(tags_downloads) # # Packages # - self.download(self.url + self.package_list, package_list) - packages = self.load_json(package_list) + self.download.get([[self.url + self.package_list, package_list]]) + + packages = self.load_json(package_list) + packages_downloads = [] for package in packages['result']: package_folder = self.dest + os.sep + 'packages' + os.sep + package package_file = package_folder + os.sep + 'package.json' - self.ensuredir(package_folder + os.sep + 'data') - print("Downloading " + self.url + self.package_show + 'id=' + package + '...') - self.download(self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file) + #print("Downloading " + self.url + self.package_show + 'id=' + package + '...') + #self.ensuredir(package_folder + os.sep + 'data') + packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file]) + + self.download.get(packages_downloads) - contents = self.load_json(package_file) + # + # Package contents + # + for package in packages['result']: + package_downloads = [] + package_folder = self.dest + os.sep + 'packages' + os.sep + package + package_file = package_folder + os.sep + 'package.json' + contents = self.load_json(package_file) for resource in contents['result']['resources']: #if resource['name'] != None: @@ -128,7 +215,9 @@ class CkanDumper: resource_file = package_folder + os.sep + 'data' + os.sep + name + format - self.download(resource['url'], resource_file) + package_download.append([resource['url'], resource_file]) + + self.download.get(package_download) # Run only once during development #return @@ -136,9 +225,10 @@ class CkanDumper: if __name__ == "__main__": # Parse CLI parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.') - parser.add_argument('url', nargs='+', help='CKAN instance URL') - parser.add_argument('dest', nargs='+', help='Destination folder') - parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download") + parser.add_argument('url', nargs='+', help='CKAN instance URL') + parser.add_argument('dest', nargs='+', help='Destination folder') + parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download") + parser.add_argument("--limit-concurrent", help="Limit the total concurrent downloads") args = parser.parse_args() # Dispatch -- cgit v1.2.3