aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSilvio Rhatto <rhatto@riseup.net>2019-05-15 15:06:03 -0300
committerSilvio Rhatto <rhatto@riseup.net>2019-05-15 15:06:03 -0300
commit600c3dd67fc3be1e6f2cd3cf68189e7e0a276b46 (patch)
treecd1192b09975484afb6cc1f90bfd10ad175d4f2d
parente4c88334af4172b46774fba1a1226cb88e7c71f6 (diff)
downloadckandumper-600c3dd67fc3be1e6f2cd3cf68189e7e0a276b46.tar.gz
ckandumper-600c3dd67fc3be1e6f2cd3cf68189e7e0a276b46.tar.bz2
Initial asyncio version
-rwxr-xr-xckandumper164
1 files changed, 127 insertions, 37 deletions
diff --git a/ckandumper b/ckandumper
index d70f35f..d447899 100755
--- a/ckandumper
+++ b/ckandumper
@@ -19,10 +19,81 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Dependencies
+import asyncio
import argparse
-import sys, os, subprocess, pycurl, json
+import sys, os, subprocess, json
from urllib.parse import urlencode
+class DownloadMultiple:
+ """Downloads multiple files simultaneously with error logging and fancy output"""
+
+ wget = '/usr/bin/wget'
+
+ def __init__(self, limit_rate, limit_concurrent):
+ self.limit_rate = limit_rate
+
+ if args.limit_concurrent != None:
+ self.limit_concurrent = asyncio.Semaphore(limit_concurrent)
+ else:
+ self.limit_concurrent = asyncio.Semaphore(20)
+
+ if not os.path.exists(self.wget):
+ raise FileNotFoundError('Wget not found in your path; please install it first.')
+
+ def ensuredir(self, dest):
+ """Ensures that the destination folder exists"""
+ if not os.path.exists(dest) and not os.path.isdir(dest):
+ os.makedirs(dest, 0o755);
+ elif os.path.exists(dest) and not os.path.isdir(dest):
+ raise ValueError('File exists and is not a folder:' + dest)
+
+ async def download_file(self, url, local_filename, semaphore):
+ """Downloads a file.
+
+ Using wget as it is more reliable
+
+ Straight from https://docs.python.org/3/library/asyncio-subprocess.html
+ """
+
+ async with semaphore:
+ print('Downloading ' + url + '...')
+ self.ensuredir(os.path.dirname(local_filename));
+
+ # Other opts: -q --show-progress
+ cmd = '/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url
+ proc = subprocess.call(cmd, shell=True)
+ proc = await asyncio.create_subprocess_shell(cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+
+ stdout, stderr = await proc.communicate()
+
+ print(f'[{cmd!r} exited with {proc.returncode}]')
+
+ if stdout:
+ print(f'[stdout]\n{stdout.decode()}')
+
+ if stderr:
+ print(f'[stderr]\n{stderr.decode()}')
+
+ async def gather(self, filepairs):
+ """Gather all files to be downloaded
+
+ See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882
+ """
+
+ jobs = []
+
+ for url, filename in filepairs:
+ jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent)))
+
+ await asyncio.gather(*jobs)
+
+ def get(self, filepairs):
+ loop = asyncio.get_event_loop()
+ loop.set_debug(True)
+ loop.run_until_complete(self.gather(filepairs))
+
class CkanDumper:
"""Dumps CKAN data: metadata plus entire datasets"""
@@ -38,32 +109,28 @@ class CkanDumper:
if args.limit_rate != None:
self.limit_rate = '--limit-rate=' + args.limit_rate
+ else:
+ self.limit_rate = ''
- def download(self, url, local_filename):
- """Downloads a file.
-
- Using wget as it is more reliable
- """
- subprocess.call('/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url, shell=True)
+ if args.limit_concurrent != None:
+ self.limit_concurrent = args.limit_concurrent
+ else:
+ self.limit_concurrent = '20'
- def ensuredir(self, dest):
- """Ensures that the destination folder exists"""
- if not os.path.exists(dest) and not os.path.isdir(dest):
- os.makedirs(dest, 0o755);
- elif os.path.exists(dest) and not os.path.isdir(dest):
- raise ValueError('File exists and is not a folder:' + dest)
+ self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent)
def load_json(self, file):
"""Loads a file with contents serialized as JSON"""
descriptor = open(file)
data = json.load(descriptor)
- file.close()
+
+ descriptor.close()
+ return data
def dump(self):
"""Downloads all content listed in a CKAN repository"""
- self.ensuredir(self.dest)
-
- # Move to dest folder
+ # Switch to dest folder
+ #self.ensuredir(self.dest)
#os.chdir(self.dest)
package_list = self.dest + os.sep + 'package_list.json'
@@ -73,45 +140,65 @@ class CkanDumper:
#
# Groups
#
- self.download(self.url + self.group_list, group_list)
- groups = self.load_json(group_list)
+ self.download.get([[self.url + self.group_list, group_list]])
+
+ groups = self.load_json(group_list)
+ group_downloads = []
for group in groups['result']:
group_folder = self.dest + os.sep + 'groups' + os.sep + group
group_file = group_folder + os.sep + 'group.json'
- self.ensuredir(group_folder)
- print("Downloading " + self.url + self.group_show + 'id=' + group + '...')
- self.download(self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file)
+ #print("Downloading " + self.url + self.group_show + 'id=' + group + '...')
+ #self.ensuredir(group_folder)
+ group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file])
+
+ self.download.get(group_downloads)
+
#
# Tags
#
- self.download(self.url + self.tag_list, tag_list)
- tags = self.load_json(tag_list)
+ self.download.get([[self.url + self.tag_list, tag_list]])
+
+ tags = self.load_json(tag_list)
+ tags_downloads = []
for tag in tags['result']:
tag_folder = self.dest + os.sep + 'tags' + os.sep + tag
tag_file = tag_folder + os.sep + 'tag.json'
- self.ensuredir(tag_folder)
- print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...')
- self.download(self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file)
+ #print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...')
+ #self.ensuredir(tag_folder)
+ tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file])
+
+ self.download.get(tags_downloads)
#
# Packages
#
- self.download(self.url + self.package_list, package_list)
- packages = self.load_json(package_list)
+ self.download.get([[self.url + self.package_list, package_list]])
+
+ packages = self.load_json(package_list)
+ packages_downloads = []
for package in packages['result']:
package_folder = self.dest + os.sep + 'packages' + os.sep + package
package_file = package_folder + os.sep + 'package.json'
- self.ensuredir(package_folder + os.sep + 'data')
- print("Downloading " + self.url + self.package_show + 'id=' + package + '...')
- self.download(self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file)
+ #print("Downloading " + self.url + self.package_show + 'id=' + package + '...')
+ #self.ensuredir(package_folder + os.sep + 'data')
+ packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file])
+
+ self.download.get(packages_downloads)
- contents = self.load_json(package_file)
+ #
+ # Package contents
+ #
+ for package in packages['result']:
+ package_downloads = []
+ package_folder = self.dest + os.sep + 'packages' + os.sep + package
+ package_file = package_folder + os.sep + 'package.json'
+ contents = self.load_json(package_file)
for resource in contents['result']['resources']:
#if resource['name'] != None:
@@ -128,7 +215,9 @@ class CkanDumper:
resource_file = package_folder + os.sep + 'data' + os.sep + name + format
- self.download(resource['url'], resource_file)
+ package_download.append([resource['url'], resource_file])
+
+ self.download.get(package_download)
# Run only once during development
#return
@@ -136,9 +225,10 @@ class CkanDumper:
if __name__ == "__main__":
# Parse CLI
parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.')
- parser.add_argument('url', nargs='+', help='CKAN instance URL')
- parser.add_argument('dest', nargs='+', help='Destination folder')
- parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download")
+ parser.add_argument('url', nargs='+', help='CKAN instance URL')
+ parser.add_argument('dest', nargs='+', help='Destination folder')
+ parser.add_argument("--limit-rate", help="Limit the download speed to amount bytes per second, per download")
+ parser.add_argument("--limit-concurrent", help="Limit the total concurrent downloads")
args = parser.parse_args()
# Dispatch