From d312f535406ad271ae0e78889710c9048f6ec5e8 Mon Sep 17 00:00:00 2001 From: Silvio Rhatto Date: Wed, 5 Jun 2019 20:16:11 -0300 Subject: Initial import --- grab-queue | 292 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 292 insertions(+) create mode 100755 grab-queue (limited to 'grab-queue') diff --git a/grab-queue b/grab-queue new file mode 100755 index 0000000..d1bf513 --- /dev/null +++ b/grab-queue @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# A grab-site scheduler, inspired by ckandumper. +# +# Copyright (C) 2019 Silvio Rhatto +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, +# or any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Dependencies +import time +import datetime +import random +import asyncio +import argparse +import sys, os, subprocess, json +import yaml +from urllib.parse import urlencode +from hashlib import sha256 +from tqdm import tqdm + +class DownloadMultiple: + """Downloads multiple files simultaneously with error logging and fancy output""" + + def __init__(self, limit_concurrent = 20, progress = True, debug = False, downloader = 'grab-site --no-offsite-links'): + # Check for grab-site + downloader_bin = downloader.split(' ')[0] + if '/' in downloader_bin and not os.path.exists(downloader_bin): + raise FileNotFoundError('executable not found in path ' + downloader_bin + '; please install it first.') + else: + subprocess.check_call(downloader_bin + ' --help', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) + + self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent)) + self.progress = progress + self.debug = debug + self.downloader = downloader + self.globalstats = { 'exitstatus': {} } + + def ensuredir(self, dest): + """Ensures that the destination folder exists""" + if not os.path.exists(dest) and not os.path.isdir(dest): + os.makedirs(dest, 0o755); + elif os.path.exists(dest) and not os.path.isdir(dest): + raise ValueError('File exists and is not a folder:' + dest) + + async def download_file(self, url, local_filename, semaphore, progress_bar): + """Downloads a file using a third-party application. + + Straight from https://docs.python.org/3/library/asyncio-subprocess.html + """ + + async with semaphore: + if self.debug: + print('Downloading ' + url + '...') + + # Heuristics to check if dest is a folder + if local_filename[-1] == '/': + folder = local_filename + else: + folder = os.path.dirname(local_filename) + + self.ensuredir(local_filename) + + # Set opts + # Other opts: -q --show-progress -O + downloader_bin = self.downloader.split(' ')[0] + if 'wget' in downloader_bin and local_filename[-1] != '/': + self_downloader += '-O ' + local_filename + + # Run command + cmd = 'cd ' + folder + ' && ' + self.downloader + ' ' + url + proc = await asyncio.create_subprocess_shell(cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + + stdout, stderr = await proc.communicate() + + if stdout: + output = open(local_filename + '.stdout', 'w') + output.write(stdout.decode()) + output.close() + + if self.debug: + print(f'[stdout] {url} {stdout.decode()}') + + if stderr: + output = open(local_filename + '.stderr', 'w') + output.write(stderr.decode()) + output.close() + + if self.debug: + print(f'[stderr] {url} {stderr.decode()}') + + output = open(local_filename + '.returncode', 'w') + output.write(str(proc.returncode) + '\n') + output.close() + + output = open(local_filename + '.date', 'w') + output.write(str(datetime.datetime.now()) + '\n') + output.close() + + if os.path.isfile(local_filename): + # File might be too big, so we're not computing it's inside the script + #content = open(local_filename, 'rb') + #output = open(local_filename + '.sha256', 'w') + #output.write(sha256(content.read()).hexdigest() + ' ' + os.path.basename(local_filename) + '\n') + #output.close() + hasher = 'cd ' + os.path.dirname(local_filename) + '&& /usr/bin/sha256sum ' + hasher += os.path.basename(local_filename) + ' > ' + os.path.basename(local_filename) + '.sha256' + hash = await asyncio.create_subprocess_shell(hasher, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + + if not str(proc.returncode) in self.globalstats['exitstatus']: + self.globalstats['exitstatus'][str(proc.returncode)] = [] + + self.globalstats['exitstatus'][str(proc.returncode)].append(url); + + if not str(proc.returncode) in self.stats['exitstatus']: + self.stats['exitstatus'][str(proc.returncode)] = [] + + self.stats['exitstatus'][str(proc.returncode)].append(url); + + if self.debug: + print(f'[{cmd!r} exited with {proc.returncode}]') + + if hasattr(progress_bar, 'update'): + progress_bar.update(1) + + return proc.returncode + + async def gather(self, filepairs): + """Gather all files to be downloaded + + See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882 + """ + + jobs = [] + progress_bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False + + for url, filename in filepairs: + jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent, progress_bar))) + + await asyncio.gather(*jobs) + + if hasattr(progress_bar, 'close'): + progress_bar.close() + + def get(self, filepairs): + self.stats = { 'exitstatus': {} } + loop = asyncio.get_event_loop() + + loop.set_debug(self.debug) + loop.run_until_complete(self.gather(filepairs)) + + if self.debug: + print(self.stats) + + return self.stats + +class GrabQueue: + """Downloads lots of files""" + + def __init__(self, args): + self.list = args.list[0] + self.dest = ''.join(args.dest) + + if args.debug != None: + self.debug = args.debug + + if args.progress != None: + self.progress = args.progress + + if args.randomize != None: + self.randomize = args.randomize + + if args.limit_concurrent != None: + self.limit_concurrent = args.limit_concurrent + else: + self.limit_concurrent = '20' + + if args.downloader == None: + #args.downloader = '/usr/bin/wget' + args.downloader = 'grab-site' + + self.download = DownloadMultiple(self.limit_concurrent, self.progress, self.debug, args.downloader) + + def load_yaml(self, file): + """Loads a file with contents serialized as YAML""" + descriptor = open(file) + data = yaml.load(descriptor, Loader=yaml.SafeLoader) + + descriptor.close() + return data + + def dump_stats(self, stats): + """Dump download batch statistics""" + if stats != None: + print('Statistics (exit status / total downloads): ', end='') + + for status in stats['exitstatus']: + print(status + ': ' + str(len(stats['exitstatus'][status])), end='; ') + + print('') + + def write_stats(self): + """Write global statistics to file""" + stats = open(self.dest + os.sep + 'grab-queue.stats.json', 'w') + stats.write(json.dumps(self.download.globalstats, indent=2) + '\n') + + def process_stats(self, stats): + """Process stats at each run""" + self.dump_stats(stats) + self.write_stats() + + def dump(self): + """Downloads all content listed in a file""" + + print('Starting at ' + str(datetime.datetime.now())) + + sites = [ [ site['url'], self.dest + '/data/' ] for site in self.load_yaml(self.list) ] + + if self.randomize == True: + random.seed() + random.shuffle(sites) + + stats = self.download.get(sites) + self.process_stats(stats) + +if __name__ == "__main__": + # Parse CLI + examples = """ Example: + + grab-queue --limit-concurrent=10 --downloader="wget --no-check-certificate" --randomize list.yaml dest/ + """ + parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.', epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter,) + parser.add_argument('list', nargs='+', help='YAML file list') + parser.add_argument('dest', nargs='+', help='Destination folder') + parser.add_argument('--limit-concurrent', help='Limit the total concurrent downloads') + parser.add_argument('--downloader', help='Custom downloader invocation, defaults to grab-site') + parser.add_argument('--debug', dest='debug', action='store_true', help='Enable debug') + parser.add_argument('--no-debug', dest='debug', action='store_false', help='Disable debug') + parser.add_argument('--progress', dest='progress', action='store_true', help='Enable progress') + parser.add_argument('--no-progress', dest='progress', action='store_false', help='Disable progress') + parser.add_argument('--randomize', dest='randomize', action='store_true', help='Randomize the list of downloads to avoid consuming resources of the same remote endpoint') + parser.add_argument('--no-randomize', dest='randomize', action='store_false', help='Do not randomize the list of downloads') + parser.set_defaults(debug=False) + parser.set_defaults(randomize=False) + parser.set_defaults(progress=True) + args = parser.parse_args() + + # Dispatch + try: + start_time = time.time() + + # Initialize our dumper + queue = GrabQueue(args) + + # Record date and invocation + logs = ''.join(args.dest) + os.sep + 'logs' + os.sep + queue.download.ensuredir(logs) + invocation = open(logs + 'grab-queue.args', 'a') + invocation.write(str(datetime.datetime.now()) + '\t' + ' '.join(sys.argv) + '\n') + invocation.close() + + # Download everything we can + queue.dump() + + # Calculate runtime + end_time = time.time() + interval = round(end_time - start_time) + duration = str(datetime.timedelta(seconds=interval)) + + # Record duration + print(f'Done. Elapsed time: {duration}') + elapsed = open(logs + 'grab-queue.duration', 'a') + elapsed.write(str(start_time) + '\t' + str(end_time) + '\t' + duration) + elapsed.close() + except (FileNotFoundError, KeyboardInterrupt, subprocess.CalledProcessError) as e: + print(e) + exit(1) -- cgit v1.2.3