#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # A grab-site scheduler, inspired by ckandumper. # # Copyright (C) 2019 Silvio Rhatto # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, # or any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Dependencies import time import datetime import random import asyncio import argparse import sys, os, subprocess, json import yaml from urllib.parse import urlencode from urllib.parse import urlparse from hashlib import sha256 from tqdm import tqdm class DownloadMultiple: """Downloads multiple files simultaneously with error logging and fancy output""" def __init__(self, limit_concurrent = 5, progress = True, debug = False, downloader = 'grab-site --no-offsite-links --wpull-args="--monitor-disk 5000m --monitor-memory 500m"'): # Check for grab-site downloader_bin = downloader.split(' ')[0] if '/' in downloader_bin and not os.path.exists(downloader_bin): raise FileNotFoundError('executable not found in path ' + downloader_bin + '; please install it first.') else: subprocess.check_call(downloader_bin + ' --help', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent)) self.progress = progress self.debug = debug self.downloader = downloader self.globalstats = { 'returncode': {} } def ensuredir(self, dest): """Ensures that the destination folder exists""" if not os.path.exists(dest) and not os.path.isdir(dest): os.makedirs(dest, 0o755); elif os.path.exists(dest) and not os.path.isdir(dest): raise ValueError('File exists and is not a folder:' + dest) async def download_file(self, url, local_filename, semaphore, progress_bar): """Downloads a file using a third-party application. Straight from https://docs.python.org/3/library/asyncio-subprocess.html """ async with semaphore: if self.debug: print('Downloading ' + url + '...') domain = urlparse(url).netloc # Heuristics to check if dest is a folder if local_filename[-1] == '/': folder = local_filename logdir = folder + '../logs/' else: folder = os.path.dirname(local_filename) logdir = '' self.ensuredir(folder) with open(logdir + 'grab-queue.log', "a") as log: log.write('Downloading started: ' + url + "\n") # Set opts # Other opts: -q --show-progress -O downloader_bin = self.downloader.split(' ')[0] if 'wget' in downloader_bin and local_filename[-1] != '/': self.downloader += '-O ' + local_filename # Run command cmd = 'cd ' + folder + ' && ' + self.downloader + ' ' + url proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() if stdout: output = open(logdir + domain + '.stdout', 'w') output.write(stdout.decode()) output.close() if self.debug: print(f'[stdout] {url} {stdout.decode()}') if stderr: output = open(logdir + domain + '.stderr', 'w') output.write(stderr.decode()) output.close() if self.debug: print(f'[stderr] {url} {stderr.decode()}') output = open(logdir + domain + '.returncode', 'w') output.write(str(proc.returncode) + '\n') output.close() output = open(logdir + domain + '.date', 'w') output.write(str(datetime.datetime.now()) + '\n') output.close() with open(logdir + 'grab-queue.log', "a") as log: log.write('Downloading finished: ' + url + "\n") if os.path.isfile(local_filename): # File might be too big, so we're not computing it's inside the script #content = open(local_filename, 'rb') #output = open(local_filename + '.sha256', 'w') #output.write(sha256(content.read()).hexdigest() + ' ' + os.path.basename(local_filename) + '\n') #output.close() hasher = 'cd ' + os.path.dirname(local_filename) + '&& /usr/bin/sha256sum ' hasher += os.path.basename(local_filename) + ' > ' + os.path.basename(local_filename) + '.sha256' hash = await asyncio.create_subprocess_shell(hasher, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) if not str(proc.returncode) in self.globalstats['returncode']: self.globalstats['returncode'][str(proc.returncode)] = [] self.globalstats['returncode'][str(proc.returncode)].append(url); if not str(proc.returncode) in self.stats['returncode']: self.stats['returncode'][str(proc.returncode)] = [] self.stats['returncode'][str(proc.returncode)].append(url); if self.debug: print(f'[{cmd!r} exited with {proc.returncode}]') if hasattr(progress_bar, 'update'): progress_bar.update(1) return proc.returncode async def gather(self, filepairs): """Gather all files to be downloaded See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 """ jobs = [] progress_bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False for url, filename in filepairs: jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent, progress_bar))) await asyncio.gather(*jobs) if hasattr(progress_bar, 'close'): progress_bar.close() def get(self, filepairs): self.stats = { 'returncode': {} } loop = asyncio.get_event_loop() loop.set_debug(self.debug) loop.run_until_complete(self.gather(filepairs)) if self.debug: print(self.stats) return self.stats class GrabQueue: """Downloads lots of files""" def __init__(self, args): self.list = args.list[0] self.dest = ''.join(args.dest) if args.debug != None: self.debug = args.debug if args.progress != None: self.progress = args.progress if args.randomize != None: self.randomize = args.randomize if args.limit_concurrent != None: self.limit_concurrent = args.limit_concurrent else: self.limit_concurrent = '5' if args.downloader == None: #args.downloader = '/usr/bin/wget' args.downloader = 'grab-site --no-offsite-links --wpull-args="--monitor-disk 5000m --monitor-memory 500m"' self.download = DownloadMultiple(self.limit_concurrent, self.progress, self.debug, args.downloader) def load_yaml(self, file): """Loads a file with contents serialized as YAML""" descriptor = open(file) data = yaml.load(descriptor, Loader=yaml.SafeLoader) descriptor.close() return data def dump_stats(self, stats): """Dump download batch statistics""" if stats != None: print('Statistics (exit status / total downloads): ', end='') for status in stats['returncode']: print(status + ': ' + str(len(stats['returncode'][status])), end='; ') print('') def write_stats(self): """Write global statistics to file""" stats = open(self.dest + os.sep + 'grab-queue.stats.json', 'w') stats.write(json.dumps(self.download.globalstats, indent=2) + '\n') def process_stats(self, stats): """Process stats at each run""" self.dump_stats(stats) self.write_stats() def dump(self): """Downloads all content listed in a file""" print('Starting at ' + str(datetime.datetime.now())) sites = [ [ site['url'], self.dest + '/data/' ] for site in self.load_yaml(self.list) ] if self.randomize == True: random.seed() random.shuffle(sites) stats = self.download.get(sites) self.process_stats(stats) if __name__ == "__main__": # Parse CLI examples = """ Example: grab-queue --limit-concurrent=10 --downloader="wget --no-check-certificate" --randomize list.yaml dest/ """ parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.', epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter,) parser.add_argument('list', nargs='+', help='YAML file list') parser.add_argument('dest', nargs='+', help='Destination folder') parser.add_argument('--limit-concurrent', help='Limit the total concurrent downloads') parser.add_argument('--downloader', help='Custom downloader invocation, defaults to grab-site') parser.add_argument('--debug', dest='debug', action='store_true', help='Enable debug') parser.add_argument('--no-debug', dest='debug', action='store_false', help='Disable debug') parser.add_argument('--progress', dest='progress', action='store_true', help='Enable progress') parser.add_argument('--no-progress', dest='progress', action='store_false', help='Disable progress') parser.add_argument('--randomize', dest='randomize', action='store_true', help='Randomize the list of downloads to avoid consuming resources of the same remote endpoint') parser.add_argument('--no-randomize', dest='randomize', action='store_false', help='Do not randomize the list of downloads') parser.set_defaults(debug=False) parser.set_defaults(randomize=False) parser.set_defaults(progress=True) args = parser.parse_args() # Dispatch try: start_time = time.time() # Initialize our dumper queue = GrabQueue(args) # Record date and invocation logs = ''.join(args.dest) + os.sep + 'logs' + os.sep queue.download.ensuredir(logs) invocation = open(logs + 'grab-queue.args', 'a') invocation.write(str(datetime.datetime.now()) + '\t' + ' '.join(sys.argv) + '\n') invocation.close() # Download everything we can queue.dump() # Calculate runtime end_time = time.time() interval = round(end_time - start_time) duration = str(datetime.timedelta(seconds=interval)) # Record duration print(f'Done. Elapsed time: {duration}') elapsed = open(logs + 'grab-queue.duration', 'a') elapsed.write(str(start_time) + '\t' + str(end_time) + '\t' + duration) elapsed.close() except (FileNotFoundError, KeyboardInterrupt, subprocess.CalledProcessError) as e: print(e) exit(1)