opalalgorithms.utils.algorithmrunner

Algorithm runner class to run algorithms during computation.

Given an algorithm object, run the algorithm.

opalalgorithms.utils.algorithmrunner.mapper(writing_queue, params, file_queue, algorithm, dev_mode=False, sandboxing=True, python_version=2)[source]

Call the map function and insert result into the queue if valid.

Parameters:
  • writing_queue (mp.manager.Queue) – Queue for inserting results.
  • params (dict) – Parameters to be used by each map of the algorithm.
  • users_csv_files (list) – List of paths of csv files of users.
  • algorithm (dict) – Dictionary with keys code and className specifying algorithm code and className.
  • dev_mode (bool) – Should the algorithm run in development mode or production mode.
  • sandboxing (bool) – Should sandboxing be used or not.
  • python_version (int) – Python version being used for sandboxing.
opalalgorithms.utils.algorithmrunner.collector(writing_queue, params, dev_mode=False)[source]

Collect the results in writing queue and post to aggregator.

Parameters:
  • writing_queue (mp.manager.Queue) – Queue from which collect results.
  • results_csv_path (str) – CSV where we have to save results.
  • dev_mode (bool) – Whether to run algorithm in development mode.
Returns:

True on successful exit if dev_mode is set to False.

Return type:

bool

Note

If dev_mode is set to true, then collector will just return all the results in a list format.

opalalgorithms.utils.algorithmrunner.is_valid_result(result)[source]

Check if result is valid.

Parameters:result – Output of the algorithm.

Note

Result is valid if it is a dict. All keys of the dict must be be a string. All values must be numbers. These results are sent to reducer which will sum, count, mean, median, mode of the values belonging to same key.

Example:
  • {“alpha1”: 1, “ant199”: 1, ..}
Returns:Specifying if the result is valid or not.
Return type:bool

Todo

  • Define what is valid with privacy and other concerns
opalalgorithms.utils.algorithmrunner.process_user_csv(params, user_csv_file, algorithm, dev_mode, sandboxing, jail)[source]

Process a single user csv file.

Parameters:
  • params (dict) – Parameters for the request.
  • user_csv_file (string) – Path to user csv file.
  • algorithm (dict) – Dictionary with keys code and className specifying algorithm code and className.
  • dev_mode (bool) – Should the algorithm run in development mode or production mode.
  • sandboxing (bool) – Should sandboxing be used or not.
  • jail (codejail.Jail) – Jail object.
Returns:

Result of the execution.

Raises:

SafeExecException – If the execution wasn’t successful.

opalalgorithms.utils.algorithmrunner.get_jail(python_version=2)[source]

Return codejail object.

Note

  • Please set environmental variables OPALALGO_SANDBOX_VENV
    and OPALALGO_SANDBOX_USER before calling this function.
  • OPALALGO_SANDBOX_VENV must be set to the path of the sandbox
    virtual environment.
  • OPALALGO_SANDBOX_USER must be set to the user running the
    sandboxed algorithms.
class opalalgorithms.utils.algorithmrunner.AlgorithmRunner(algorithm, dev_mode=False, multiprocess=True, sandboxing=True)[source]

Algorithm runner.

Parameters:
  • algorithm (dict) – Dictionary containing code and className.
  • dev_mode (bool) – Development mode switch
  • multiprocess (bool) – Use multiprocessing or single process for complete execution.
  • sandboxing (bool) – Use sandboxing for execution or execute in unsafe environment.
__call__(params, data_dir, num_threads, weights_file=None)[source]

Run algorithm.

Selects the csv files from the data directory. Divides the csv files into chunks of equal size across the num_threads threads. Each thread performs calls map function of the csv file and processes the result. The collector thread, waits for results before posting it to aggregator service.

Parameters:
  • params (dict) – Dictionary containing all the parameters for the algorithm
  • data_dir (str) – Data directory with csv files.
  • num_threads (int) – Number of threads
  • weights_file (str) – Path to the json file containing weights.
Returns:

Amount of time required for computation in microseconds.

Return type:

int