aboutsummaryrefslogtreecommitdiff
path: root/grab-queue
diff options
context:
space:
mode:
authorSilvio Rhatto <rhatto@riseup.net>2019-06-05 20:16:11 -0300
committerSilvio Rhatto <rhatto@riseup.net>2019-06-05 20:16:11 -0300
commitd312f535406ad271ae0e78889710c9048f6ec5e8 (patch)
tree0a27213e63389201b67a20a3219fce4c3b8e4f01 /grab-queue
downloadgrab-queue-d312f535406ad271ae0e78889710c9048f6ec5e8.tar.gz
grab-queue-d312f535406ad271ae0e78889710c9048f6ec5e8.tar.bz2
Initial import
Diffstat (limited to 'grab-queue')
-rwxr-xr-xgrab-queue292
1 files changed, 292 insertions, 0 deletions
diff --git a/grab-queue b/grab-queue
new file mode 100755
index 0000000..d1bf513
--- /dev/null
+++ b/grab-queue
@@ -0,0 +1,292 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# A grab-site scheduler, inspired by ckandumper.
+#
+# Copyright (C) 2019 Silvio Rhatto <rhatto@riseup.net>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation, either version 3 of the License,
+# or any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+# Dependencies
+import time
+import datetime
+import random
+import asyncio
+import argparse
+import sys, os, subprocess, json
+import yaml
+from urllib.parse import urlencode
+from hashlib import sha256
+from tqdm import tqdm
+
+class DownloadMultiple:
+ """Downloads multiple files simultaneously with error logging and fancy output"""
+
+ def __init__(self, limit_concurrent = 20, progress = True, debug = False, downloader = 'grab-site --no-offsite-links'):
+ # Check for grab-site
+ downloader_bin = downloader.split(' ')[0]
+ if '/' in downloader_bin and not os.path.exists(downloader_bin):
+ raise FileNotFoundError('executable not found in path ' + downloader_bin + '; please install it first.')
+ else:
+ subprocess.check_call(downloader_bin + ' --help', stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)
+
+ self.limit_concurrent = asyncio.Semaphore(int(limit_concurrent))
+ self.progress = progress
+ self.debug = debug
+ self.downloader = downloader
+ self.globalstats = { 'exitstatus': {} }
+
+ def ensuredir(self, dest):
+ """Ensures that the destination folder exists"""
+ if not os.path.exists(dest) and not os.path.isdir(dest):
+ os.makedirs(dest, 0o755);
+ elif os.path.exists(dest) and not os.path.isdir(dest):
+ raise ValueError('File exists and is not a folder:' + dest)
+
+ async def download_file(self, url, local_filename, semaphore, progress_bar):
+ """Downloads a file using a third-party application.
+
+ Straight from https://docs.python.org/3/library/asyncio-subprocess.html
+ """
+
+ async with semaphore:
+ if self.debug:
+ print('Downloading ' + url + '...')
+
+ # Heuristics to check if dest is a folder
+ if local_filename[-1] == '/':
+ folder = local_filename
+ else:
+ folder = os.path.dirname(local_filename)
+
+ self.ensuredir(local_filename)
+
+ # Set opts
+ # Other opts: -q --show-progress -O
+ downloader_bin = self.downloader.split(' ')[0]
+ if 'wget' in downloader_bin and local_filename[-1] != '/':
+ self_downloader += '-O ' + local_filename
+
+ # Run command
+ cmd = 'cd ' + folder + ' && ' + self.downloader + ' ' + url
+ proc = await asyncio.create_subprocess_shell(cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+
+ stdout, stderr = await proc.communicate()
+
+ if stdout:
+ output = open(local_filename + '.stdout', 'w')
+ output.write(stdout.decode())
+ output.close()
+
+ if self.debug:
+ print(f'[stdout] {url} {stdout.decode()}')
+
+ if stderr:
+ output = open(local_filename + '.stderr', 'w')
+ output.write(stderr.decode())
+ output.close()
+
+ if self.debug:
+ print(f'[stderr] {url} {stderr.decode()}')
+
+ output = open(local_filename + '.returncode', 'w')
+ output.write(str(proc.returncode) + '\n')
+ output.close()
+
+ output = open(local_filename + '.date', 'w')
+ output.write(str(datetime.datetime.now()) + '\n')
+ output.close()
+
+ if os.path.isfile(local_filename):
+ # File might be too big, so we're not computing it's inside the script
+ #content = open(local_filename, 'rb')
+ #output = open(local_filename + '.sha256', 'w')
+ #output.write(sha256(content.read()).hexdigest() + ' ' + os.path.basename(local_filename) + '\n')
+ #output.close()
+ hasher = 'cd ' + os.path.dirname(local_filename) + '&& /usr/bin/sha256sum '
+ hasher += os.path.basename(local_filename) + ' > ' + os.path.basename(local_filename) + '.sha256'
+ hash = await asyncio.create_subprocess_shell(hasher,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE)
+
+ if not str(proc.returncode) in self.globalstats['exitstatus']:
+ self.globalstats['exitstatus'][str(proc.returncode)] = []
+
+ self.globalstats['exitstatus'][str(proc.returncode)].append(url);
+
+ if not str(proc.returncode) in self.stats['exitstatus']:
+ self.stats['exitstatus'][str(proc.returncode)] = []
+
+ self.stats['exitstatus'][str(proc.returncode)].append(url);
+
+ if self.debug:
+ print(f'[{cmd!r} exited with {proc.returncode}]')
+
+ if hasattr(progress_bar, 'update'):
+ progress_bar.update(1)
+
+ return proc.returncode
+
+ async def gather(self, filepairs):
+ """Gather all files to be downloaded
+
+ See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882
+ """
+
+ jobs = []
+ progress_bar = tqdm(total=len(filepairs)) if self.progress and len(filepairs) > 1 else False
+
+ for url, filename in filepairs:
+ jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent, progress_bar)))
+
+ await asyncio.gather(*jobs)
+
+ if hasattr(progress_bar, 'close'):
+ progress_bar.close()
+
+ def get(self, filepairs):
+ self.stats = { 'exitstatus': {} }
+ loop = asyncio.get_event_loop()
+
+ loop.set_debug(self.debug)
+ loop.run_until_complete(self.gather(filepairs))
+
+ if self.debug:
+ print(self.stats)
+
+ return self.stats
+
+class GrabQueue:
+ """Downloads lots of files"""
+
+ def __init__(self, args):
+ self.list = args.list[0]
+ self.dest = ''.join(args.dest)
+
+ if args.debug != None:
+ self.debug = args.debug
+
+ if args.progress != None:
+ self.progress = args.progress
+
+ if args.randomize != None:
+ self.randomize = args.randomize
+
+ if args.limit_concurrent != None:
+ self.limit_concurrent = args.limit_concurrent
+ else:
+ self.limit_concurrent = '20'
+
+ if args.downloader == None:
+ #args.downloader = '/usr/bin/wget'
+ args.downloader = 'grab-site'
+
+ self.download = DownloadMultiple(self.limit_concurrent, self.progress, self.debug, args.downloader)
+
+ def load_yaml(self, file):
+ """Loads a file with contents serialized as YAML"""
+ descriptor = open(file)
+ data = yaml.load(descriptor, Loader=yaml.SafeLoader)
+
+ descriptor.close()
+ return data
+
+ def dump_stats(self, stats):
+ """Dump download batch statistics"""
+ if stats != None:
+ print('Statistics (exit status / total downloads): ', end='')
+
+ for status in stats['exitstatus']:
+ print(status + ': ' + str(len(stats['exitstatus'][status])), end='; ')
+
+ print('')
+
+ def write_stats(self):
+ """Write global statistics to file"""
+ stats = open(self.dest + os.sep + 'grab-queue.stats.json', 'w')
+ stats.write(json.dumps(self.download.globalstats, indent=2) + '\n')
+
+ def process_stats(self, stats):
+ """Process stats at each run"""
+ self.dump_stats(stats)
+ self.write_stats()
+
+ def dump(self):
+ """Downloads all content listed in a file"""
+
+ print('Starting at ' + str(datetime.datetime.now()))
+
+ sites = [ [ site['url'], self.dest + '/data/' ] for site in self.load_yaml(self.list) ]
+
+ if self.randomize == True:
+ random.seed()
+ random.shuffle(sites)
+
+ stats = self.download.get(sites)
+ self.process_stats(stats)
+
+if __name__ == "__main__":
+ # Parse CLI
+ examples = """ Example:
+
+ grab-queue --limit-concurrent=10 --downloader="wget --no-check-certificate" --randomize list.yaml dest/
+ """
+ parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.', epilog=examples, formatter_class=argparse.RawDescriptionHelpFormatter,)
+ parser.add_argument('list', nargs='+', help='YAML file list')
+ parser.add_argument('dest', nargs='+', help='Destination folder')
+ parser.add_argument('--limit-concurrent', help='Limit the total concurrent downloads')
+ parser.add_argument('--downloader', help='Custom downloader invocation, defaults to grab-site')
+ parser.add_argument('--debug', dest='debug', action='store_true', help='Enable debug')
+ parser.add_argument('--no-debug', dest='debug', action='store_false', help='Disable debug')
+ parser.add_argument('--progress', dest='progress', action='store_true', help='Enable progress')
+ parser.add_argument('--no-progress', dest='progress', action='store_false', help='Disable progress')
+ parser.add_argument('--randomize', dest='randomize', action='store_true', help='Randomize the list of downloads to avoid consuming resources of the same remote endpoint')
+ parser.add_argument('--no-randomize', dest='randomize', action='store_false', help='Do not randomize the list of downloads')
+ parser.set_defaults(debug=False)
+ parser.set_defaults(randomize=False)
+ parser.set_defaults(progress=True)
+ args = parser.parse_args()
+
+ # Dispatch
+ try:
+ start_time = time.time()
+
+ # Initialize our dumper
+ queue = GrabQueue(args)
+
+ # Record date and invocation
+ logs = ''.join(args.dest) + os.sep + 'logs' + os.sep
+ queue.download.ensuredir(logs)
+ invocation = open(logs + 'grab-queue.args', 'a')
+ invocation.write(str(datetime.datetime.now()) + '\t' + ' '.join(sys.argv) + '\n')
+ invocation.close()
+
+ # Download everything we can
+ queue.dump()
+
+ # Calculate runtime
+ end_time = time.time()
+ interval = round(end_time - start_time)
+ duration = str(datetime.timedelta(seconds=interval))
+
+ # Record duration
+ print(f'Done. Elapsed time: {duration}')
+ elapsed = open(logs + 'grab-queue.duration', 'a')
+ elapsed.write(str(start_time) + '\t' + str(end_time) + '\t' + duration)
+ elapsed.close()
+ except (FileNotFoundError, KeyboardInterrupt, subprocess.CalledProcessError) as e:
+ print(e)
+ exit(1)