Source code for opalalgorithms.utils.algorithmrunner

"""Given an algorithm object, run the algorithm."""
from __future__ import division, print_function

import signal
import sys
import multiprocessing as mp
import os
import textwrap
import json

import requests
import six
import codejail
from codejail.safe_exec import not_safe_exec
from codejail.limits import set_limit


__all__ = ["AlgorithmRunner"]


class GracefulExit(Exception):
    """Graceful exit exception class."""


def sigint_handler(signum, thread):
    """Handle interrupt signal."""
    raise GracefulExit()


def check_environ():
    """Check that all environment variable exists.

    Note:
        - Required environment variables are `OPALALGO_SANDBOX_VENV` and
            `OPALALGO_SANDBOX_USER`.

    """
    req_environ_vars = ['OPALALGO_SANDBOX_VENV', 'OPALALGO_SANDBOX_USER']
    for environ_var in req_environ_vars:
        if environ_var not in os.environ:
            raise RuntimeError(
                'Environment variable {} not set'.format(environ_var))


[docs]def get_jail(python_version=sys.version_info[0]): """Return codejail object. Note: - Please set environmental variables `OPALALGO_SANDBOX_VENV` and `OPALALGO_SANDBOX_USER` before calling this function. - `OPALALGO_SANDBOX_VENV` must be set to the path of the sandbox virtual environment. - `OPALALGO_SANDBOX_USER` must be set to the user running the sandboxed algorithms. """ sandbox_env = os.environ.get('OPALALGO_SANDBOX_VENV') sandbox_user = os.environ.get('OPALALGO_SANDBOX_USER') set_limit("REALTIME", None) set_limit("CPU", 15) codejail.configure( 'python', os.path.join(sandbox_env, 'bin', 'python'), user=sandbox_user) codejail.configure( 'python3', os.path.join(sandbox_env, 'bin', 'python'), user=sandbox_user) if python_version < 3: jail = codejail.get_codejail('python') else: jail = codejail.get_codejail('python3') return jail
[docs]def process_user_csv(params, user_csv_file, algorithm, dev_mode, sandboxing, jail): """Process a single user csv file. Args: params (dict): Parameters for the request. user_csv_file (string): Path to user csv file. algorithm (dict): Dictionary with keys `code` and `className` specifying algorithm code and className. dev_mode (bool): Should the algorithm run in development mode or production mode. sandboxing (bool): Should sandboxing be used or not. jail (codejail.Jail): Jail object. Returns: Result of the execution. Raises: SafeExecException: If the execution wasn't successful. """ username = os.path.splitext(os.path.basename(user_csv_file))[0] globals_dict = { 'params': params, } user_specific_code = textwrap.dedent( """ def run_code(): import bandicoot algorithmobj = {}() bandicoot_user = bandicoot.read_csv( '{}', '', describe={}, warnings={}) return algorithmobj.map(params, bandicoot_user) result = run_code() """.format( algorithm['className'], username, str(dev_mode), str(dev_mode))) code = "{}\n{}".format(algorithm['code'], user_specific_code) if sandboxing: jail.safe_exec( code, globals_dict, files=[user_csv_file]) else: not_safe_exec( code, globals_dict, files=[user_csv_file]) result = globals_dict['result'] return result
[docs]def mapper(writing_queue, params, file_queue, algorithm, dev_mode=False, sandboxing=True, python_version=2): """Call the map function and insert result into the queue if valid. Args: writing_queue (mp.manager.Queue): Queue for inserting results. params (dict): Parameters to be used by each map of the algorithm. users_csv_files (list): List of paths of csv files of users. algorithm (dict): Dictionary with keys `code` and `className` specifying algorithm code and className. dev_mode (bool): Should the algorithm run in development mode or production mode. sandboxing (bool): Should sandboxing be used or not. python_version (int): Python version being used for sandboxing. """ jail = get_jail(python_version) while not file_queue.empty(): filepath = None scaler = None try: result = file_queue.get(timeout=1) filepath, scaler = result except Exception as exc: print(exc) break result = process_user_csv( params, filepath, algorithm, dev_mode, sandboxing, jail) if result and is_valid_result(result): writing_queue.put((result, scaler)) elif result and dev_mode: print("Error in result {}".format(result))
def scale_result(result, scaler): """Return scaled result. Args: result (dict): Result. scaler (number): Factor by which results need to be scaled. Returns: dict: Scaled result. """ scaled_result = {} for key, val in six.iteritems(result): scaled_result[key] = scaler * val return scaled_result
[docs]def collector(writing_queue, params, dev_mode=False): """Collect the results in writing queue and post to aggregator. Args: writing_queue (mp.manager.Queue): Queue from which collect results. results_csv_path (str): CSV where we have to save results. dev_mode (bool): Whether to run algorithm in development mode. Returns: bool: True on successful exit if `dev_mode` is set to False. Note: If `dev_mode` is set to true, then collector will just return all the results in a list format. """ result_processor = ResultProcessor(params, dev_mode) while True: # wait for result to appear in the queue processed_result = writing_queue.get() # if got signal 'kill' exit the loop if processed_result == 'kill': break result, scaler = processed_result result_processor(result, scaler=scaler) return result_processor.get_result()
[docs]def is_valid_result(result): """Check if result is valid. Args: result: Output of the algorithm. Note: Result is valid if it is a dict. All keys of the dict must be be a string. All values must be numbers. These results are sent to reducer which will sum, count, mean, median, mode of the values belonging to same key. Example: - {"alpha1": 1, "ant199": 1, ..} Returns: bool: Specifying if the result is valid or not. Todo: * Define what is valid with privacy and other concerns """ # check result must be a dict if not isinstance(result, dict): return False # check each value must be an integer or float if not (all([isinstance(x, six.integer_types) or isinstance(x, float) for x in six.itervalues(result)])): return False # check each key must be a string. if not (all([isinstance(x, six.string_types) for x in six.iterkeys(result)])): return False return True
class ResultProcessor(object): """Process results. Args: params (dict): Dictionary of parameters. dev_mode (bool): Specify if dev_mode is on. """ def __init__(self, params, dev_mode): """Initialize result processor.""" self.params = params self.dev_mode = dev_mode self.result_list = [] def __call__(self, result, scaler=1): """Process the result. If dev_mode is set to true, it appends the result to a list. Else it send the post request to `aggregationServiceUrl`. Args: result (dict): Result of the processed algorithm. scaler (int): Scale results by what value. """ result = scale_result(result, scaler) if self.dev_mode: self.result_list.append(result) else: self._send_request(result) def _send_request(self, result): """Send request to aggregationServiceUrl. Args: result (dict): Result to be sent as an update. """ response = requests.post( self.params['aggregationServiceUrl'], json={'update': result}) if response.status_code != 200: raise RuntimeError( 'Aggregation service returned {}'.format( response.status_code)) def get_result(self): """Return the result after processing. Returns: dict: if dev_mode is set to true else returns `True` """ if self.dev_mode: return self.result_list return True
[docs]class AlgorithmRunner(object): """Algorithm runner. Args: algorithm (dict): Dictionary containing `code` and `className`. dev_mode (bool): Development mode switch multiprocess (bool): Use multiprocessing or single process for complete execution. sandboxing (bool): Use sandboxing for execution or execute in unsafe environment. """ def __init__(self, algorithm, dev_mode=False, multiprocess=True, sandboxing=True): """Initialize class.""" self.algorithm = algorithm self.dev_mode = dev_mode self.multiprocess = multiprocess self.sandboxing = sandboxing
[docs] def __call__(self, params, data_dir, num_threads, weights_file=None): """Run algorithm. Selects the csv files from the data directory. Divides the csv files into chunks of equal size across the `num_threads` threads. Each thread performs calls map function of the csv file and processes the result. The collector thread, waits for results before posting it to aggregator service. Args: params (dict): Dictionary containing all the parameters for the algorithm data_dir (str): Data directory with csv files. num_threads (int): Number of threads weights_file (str): Path to the json file containing weights. Returns: int: Amount of time required for computation in microseconds. """ check_environ() csv_files = [os.path.join( os.path.abspath(data_dir), f) for f in os.listdir(data_dir) if f.endswith('.csv')] csv2weights = self._get_weights(csv_files, weights_file) if self.multiprocess: return self._multiprocess( params, num_threads, csv_files, csv2weights) return self._singleprocess(params, csv_files, csv2weights)
def _get_weights(self, csv_files, weights_file): """Return weights for each user if available, else return 1.""" weights = None if weights_file: with open(weights_file) as file_path: weights = json.load(file_path) csv2weights = {} for file_path in csv_files: csv_weight = 1 # default weight user = os.path.splitext(os.path.basename(file_path))[0] if weights and user in weights: csv_weight = weights[user] csv2weights[file_path] = csv_weight return csv2weights def _multiprocess(self, params, num_threads, csv_files, csv2weights): # set up parallel processing manager = mp.Manager() writing_queue = manager.Queue() file_queue = manager.Queue() for fpath in csv_files: file_queue.put((fpath, csv2weights[fpath])) jobs = [] # additional 1 process for writer signal.signal(signal.SIGINT, signal.SIG_IGN) pool = mp.Pool(processes=num_threads + 1) signal.signal(signal.SIGINT, sigint_handler) try: collector_job = pool.apply_async( collector, (writing_queue, params, self.dev_mode)) # Compute the density for _ in range(num_threads): jobs.append(pool.apply_async(mapper, ( writing_queue, params, file_queue, self.algorithm, self.dev_mode, self.sandboxing))) # Clean up parallel processing (close pool, wait for processes to # finish, kill writing_queue, wait for queue to be killed) pool.close() for job in jobs: job.get() writing_queue.put('kill') # stop collection result = collector_job.get() pool.join() return result except GracefulExit: pool.terminate() print("Exiting") pool.join() raise RuntimeError("Received interrupt signal, exiting. Bye.") def _singleprocess(self, params, csv_files, csv2weights): result_processor = ResultProcessor(params, self.dev_mode) jail = get_jail(python_version=2) for fpath in csv_files: scaler = csv2weights[fpath] result = process_user_csv( params, fpath, self.algorithm, self.dev_mode, self.sandboxing, jail) result_processor(result, scaler=scaler) return result_processor.get_result()