Workers related functionality API
Worker
Each MPI worker process starts infinite worker_loop()
,
which accepts and processes incoming operations.
API
Cancel operation from Operations
class breaks the loop
and leaves all internal storages in their current state.
- async unidist.core.backends.mpi.core.worker.loop.worker_loop()
Infinite operations processing loop.
Master or any worker could be the source of an operation.
Notes
The loop exits on special cancelation operation.
unidist.core.backends.mpi.core.common.Operations
defines a set of supported operations.
- unidist.core.backends.mpi.core.worker.request_store.RequestStore.process_get_request(self, source_rank, data_id, is_blocking_op=False)
Satisfy GET operation request from another process.
Save request for later processing if data_id is not available currently.
- Parameters:
source_rank (int) – Rank number to send data to.
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – data_id associated data to request
is_blocking_op (bool, default: False) – Whether the get request should be blocking or not. If
True
, the request should be processed immediatly even for a worker since it can get into controller mode.
Notes
Request is asynchronous, no wait for the data sending.
- unidist.core.backends.mpi.core.worker.request_store.RequestStore.process_wait_request(self, data_id)
Satisfy WAIT operation request from another process.
Save request for later processing if data_id is not available currently.
- Parameters:
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – Chech if data_id is available in object store.
Notes
Only ROOT rank is supported for now, therefore no rank argument needed.
- unidist.core.backends.mpi.core.worker.task_store.TaskStore.process_task_request(self, request)
Parse request data and execute the task if possible.
Data dependencies should be resolved for task execution.
- Parameters:
request (dict) – Task related data (args, function, output).
- Returns:
Same request if the task couldn`t be executed, otherwise
None
.- Return type:
dict or None
- unidist.core.backends.mpi.core.worker.task_store.TaskStore.request_worker_data(self, dest_rank, data_id)
Send GET operation with data request to destination worker.
- Parameters:
dest_rank (int) – Rank number to request data from.
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – data_id associated data to request.
Notes
Request is asynchronous, no wait for the data.
Request Storage
RequestStore
stores unidist.get
and unidist.wait
requests for the current worker,
which couldn’t be satisied right now due to data dependencies. TaskStore
stores task execution
requests that couldn’t be satisied right now due to data dependencies.
API
- class unidist.core.backends.mpi.core.worker.request_store.RequestStore
Class that stores data requests that couldn’t be satisfied now.
- GET
Get request from other worker to be executed.
- Type:
int, default 0
- WAIT
Wait request from other worker to be executed.
- Type:
int, default 1
- DATA
Data request to other worker.
- Type:
int, default 2
Notes
Supports GET and WAIT requests.
- check_pending_get_requests(data_ids)
Check if GET event on this data_ids is waiting to be processed.
Process the request if data ID available in local object store.
- Parameters:
data_id (iterable or unidist.core.backends.mpi.core.common.MpiDataID) – An ID or list of IDs to data.
- check_pending_wait_requests(data_ids)
Check if WAIT event on this data_ids is waiting to be processed.
Process the request if data ID available in local object store. Send signal without any data.
- Parameters:
data_id (iterable or unidist.core.backends.mpi.core.common.MpiDataID) – An ID or list of IDs to data.
- clear_get_requests()
Clear blocking and non-blocking get requests.
- clear_wait_requests()
Clear blocking wait requests requests.
- discard_data_request(data_id)
Discard data request by data_id because the data has become available.
- Parameters:
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – An ID to data.
- classmethod get_instance()
Get instance of
RequestStore
.- Return type:
- is_data_already_requested(data_id)
Check if data by particular data_id was already requested from another MPI process.
- Parameters:
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – An ID to data.
- Returns:
True
if communnication request was happened for this ID.- Return type:
bool
- process_get_request(source_rank, data_id, is_blocking_op=False)
Satisfy GET operation request from another process.
Save request for later processing if data_id is not available currently.
- Parameters:
source_rank (int) – Rank number to send data to.
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – data_id associated data to request
is_blocking_op (bool, default: False) – Whether the get request should be blocking or not. If
True
, the request should be processed immediatly even for a worker since it can get into controller mode.
Notes
Request is asynchronous, no wait for the data sending.
- process_wait_request(data_id)
Satisfy WAIT operation request from another process.
Save request for later processing if data_id is not available currently.
- Parameters:
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – Chech if data_id is available in object store.
Notes
Only ROOT rank is supported for now, therefore no rank argument needed.
- put(data_id, rank, request_type, is_blocking_op=False)
Save request type for this data ID for later processing.
- Parameters:
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – An ID to data.
rank (int) – Source rank requester.
request_type (int) – Type of request.
is_blocking_op (bool) – Whether the get request should be blocking or not. If
True
, the request should be processed immediatly even for a worker since it can get into controller mode.
- class unidist.core.backends.mpi.core.worker.task_store.TaskStore
Class that stores tasks/actor-tasks that couldn’t be executed due to data dependencies.
- check_pending_actor_tasks()
Check a list of pending actor task execution requests and process all ready tasks.
Task is ready if all data dependencies are resolved.
- check_pending_tasks()
Check a list of pending task execution requests and process all ready tasks.
Task is ready if all data dependencies are resolved.
- clear_pending_actor_tasks()
Clear a list of pending actor task execution requests.
- clear_pending_tasks()
Clear a list of pending task execution requests.
- execute_received_task(output_data_ids, task, args, kwargs)
Execute a task/actor-task and handle results.
- Parameters:
output_data_ids (list of unidist.core.backends.mpi.core.common.MpiDataID) – A list of output data IDs to store the results in local object store.
task (callable) – Function to be executed.
args (iterable) – Positional arguments to be passed in the task.
kwargs (dict) – Keyword arguments to be passed in the task.
Notes
Exceptions are stored in output data IDs as value.
- process_task_request(request)
Parse request data and execute the task if possible.
Data dependencies should be resolved for task execution.
- Parameters:
request (dict) – Task related data (args, function, output).
- Returns:
Same request if the task couldn`t be executed, otherwise
None
.- Return type:
dict or None
- put(request)
Save task execution request for later processing.
Some data dependencies are not resolved yet.
- Parameters:
request (dict) – Task execution request with arguments.
- put_actor(request)
Save actor task execution request for later processing.
Some data dependencies are not resolved yet.
- Parameters:
request (dict) – Actor task execution request with arguments.
- request_worker_data(dest_rank, data_id)
Send GET operation with data request to destination worker.
- Parameters:
dest_rank (int) – Rank number to request data from.
data_id (unidist.core.backends.mpi.core.common.MpiDataID) – data_id associated data to request.
Notes
Request is asynchronous, no wait for the data.
- unwrap_local_data_id(arg)
Inspect argument and get the ID associated data from the local object store if available.
If the object store is missing this data ID, request the data from another worker.
- Parameters:
arg (object or unidist.core.backends.mpi.core.common.MpiDataID) – Data ID or object to inspect.
- Returns:
Same value or data ID associated data and special flag.
- Return type:
tuple
Notes
The function returns the success status of data materialization attempt as a flag. If the data ID could not be resolved, the function returns
True
.