MPI communication API

MPI communication routines for controller/worker data interchange.

API

Simple operations involve send/receive objects of standard Python data types. mpi4py library handles serialization with pickle library by default.

unidist.core.backends.mpi.core.communication.mpi_send_object(comm, data, dest_rank, tag=112)

Send a Python object to another MPI rank in a blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • data (object) – Data to send.

  • dest_rank (int) – Target MPI process to transfer data.

  • tag (common.MPITag, default: common.MPITag.OBJECT) – Message tag.

Notes

  • This blocking send is used when we have to wait for completion of the communication, which is necessary for the pipeline to continue, or when the receiver is waiting for a result. Otherwise, use non-blocking mpi_isend_object.

  • The special tag is used for this communication, namely, common.MPITag.OBJECT.

unidist.core.backends.mpi.core.communication.mpi_isend_object(comm, data, dest_rank)

Send a Python object to another MPI rank in a non-blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • data (object) – Data to send.

  • dest_rank (int) – Target MPI process to transfer data.

Returns:

A handler to MPI_Isend communication result.

Return type:

object

Notes

The special tag is used for this communication, namely, common.MPITag.OBJECT.

unidist.core.backends.mpi.core.communication.mpi_send_buffer(comm, buffer, dest_rank, data_type=<mpi4py.MPI.Datatype object>, buffer_size=None)

Send buffer object to another MPI rank in a blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • buffer (object) – Buffer object to send.

  • dest_rank (int) – Target MPI process to transfer buffer.

  • data_type (MPI.Datatype, default: MPI.CHAR) – MPI data type for sending data.

  • buffer_size (int, default: None) – Buffer size in bytes. Send an additional message with a buffer size to prepare another process to receive if buffer_size is not None.

Notes

  • This blocking send is used when we have to wait for completion of the communication, which is necessary for the pipeline to continue, or when the receiver is waiting for a result. Otherwise, use non-blocking mpi_isend_buffer.

  • The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.mpi_isend_buffer(comm, buffer_size, buffer, dest_rank)

Send buffer object to another MPI rank in a non-blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • buffer_size (int) – Buffer size in bytes.

  • buffer (object) – Buffer object to send.

  • dest_rank (int) – Target MPI process to transfer data.

Returns:

A handler to MPI_Isend communication result.

Return type:

object

Notes

The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.mpi_recv_buffer(comm, source_rank, result_buffer=None)

Receive data buffer.

Parameters:
  • comm (object) – MPI communicator object.

  • source_rank (int) – Communication event source rank.

  • result_buffer (object, default: None) – The array to be filled. If result_buffer is None, the buffer size will be requested and the necessary buffer created.

Returns:

Array buffer or serialized object.

Return type:

object

Notes

The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.send_simple_operation(comm, operation_type, operation_data, dest_rank)

Send an operation type and standard Python data types in a blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • operation_type (unidist.core.backends.mpi.core.common.Operation) – Operation message type.

  • operation_data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

Notes

  • This blocking send is used when we have to wait for completion of the communication, which is necessary for the pipeline to continue, or when the receiver is waiting for a result. Otherwise, use non-blocking isend_simple_operation.

  • Serialization of the data to be sent takes place just using pickle.dump in this case.

  • The special tags are used for this communication, namely, common.MPITag.OPERATION and common.MPITag.OBJECT.

unidist.core.backends.mpi.core.communication.isend_simple_operation(comm, operation_type, operation_data, dest_rank)

Send an operation type and standard Python data types in a non-blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • operation_type (unidist.core.backends.mpi.core.common.Operation) – Operation message type.

  • operation_data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

Returns:

A list of pairs, MPI_Isend handler and associated data to send.

Return type:

list

Notes

  • Serialization of the data to be sent takes place just using pickle.dump in this case.

  • The special tags are used for this communication, namely, common.MPITag.OPERATION and common.MPITag.OBJECT.

unidist.core.backends.mpi.core.communication.mpi_recv_object(comm, source_rank)

Receive an object of a standard Python data type.

Parameters:
  • comm (object) – MPI communicator object.

  • source_rank (int) – Source MPI process to receive data from.

Returns:

Received data object from another MPI process.

Return type:

object

Notes

  • De-serialization is a simple pickle.load in this case.

  • The special tag is used for this communication, namely, common.MPITag.OBJECT.

Complex operations involve send/receive objects of custom data types, functions and classes with native buffers support. Several levels of serialization handle this case, including msgpack, cloudpickle and pickle libraries. pickle library uses protocol 5 for out-of-band buffers serialization for performance reasons. isend_complex_operation() is an asynchronous interface for sending data.

unidist.core.backends.mpi.core.communication.send_complex_data(comm, data, dest_rank, is_serialized=False)

Send the data that consists of different user provided complex types, lambdas and buffers in a blocking way.

Parameters:
  • comm (object) – MPI communicator object.

  • data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

  • is_serialized (bool, default: False) – data is already serialized or not.

Returns:

Serialized data for caching purpose.

Return type:

dict

Notes

This blocking send is used when we have to wait for completion of the communication, which is necessary for the pipeline to continue, or when the receiver is waiting for a result. Otherwise, use non-blocking isend_complex_data.

unidist.core.backends.mpi.core.communication.isend_complex_data(comm, data, dest_rank, is_serialized=False)

Send the data that consists of different user provided complex types, lambdas and buffers in a non-blocking way.

Non-blocking asynchronous interface.

Parameters:
  • comm (object) – MPI communicator object.

  • data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

  • is_serialized (bool, default: False) – operation_data is already serialized or not.

Returns:

  • list – A list of pairs, MPI_Isend handler and associated data to send.

  • object – A serialized msgpack data.

  • list – A list of pickle buffers.

  • list – A list of buffers amount for each object.

Notes

The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.recv_complex_data(comm, source_rank, info_package)

Receive the data that may consist of different user provided complex types, lambdas and buffers.

The data is de-serialized from received buffer.

Parameters:
  • comm (object) – MPI communicator object.

  • source_rank (int) – Source MPI process to receive data from.

  • info_package (unidist.core.backends.mpi.core.common.MetadataPackage) – Required information to deserialize data.

Returns:

Received data object from another MPI process.

Return type:

object

Notes

The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.isend_complex_operation(comm, operation_type, operation_data, dest_rank, is_serialized=False)

Send operation and data that consists of different user provided complex types, lambdas and buffers.

Non-blocking asynchronous interface. The data is serialized with unidist.core.backends.mpi.core.ComplexDataSerializer. Function works with already serialized data.

Parameters:
  • comm (object) – MPI communicator object.

  • operation_type (unidist.core.backends.mpi.core.common.Operation) – Operation message type.

  • operation_data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

  • is_serialized (bool) – operation_data is already serialized or not. - operation_data is always serialized for data that has already been saved into the object store. - operation_data is always not serialized for sending a task or an actor (actor method).

Returns:

Async handlers list and serialization data dict for caching purpose.

Return type:

list and dict

Notes

The special tags are used for this communication, namely, common.MPITag.OPERATION, common.MPITag.OBJECT and common.MPITag.BUFFER.

Complex operations as above, but operating with a bytearray of already serialized data.

unidist.core.backends.mpi.core.communication.isend_serialized_operation(comm, operation_type, operation_data, dest_rank)

Send operation and serialized simple data.

Parameters:
  • comm (object) – MPI communicator object.

  • operation_type (unidist.core.backends.mpi.core.common.Operation) – Operation message type.

  • operation_data (object) – Data object to send.

  • dest_rank (int) – Target MPI process to transfer data.

Returns:

A list of pairs, MPI_Isend handler and associated data to send.

Return type:

list

Notes

The special tags are used for this communication, namely, common.MPITag.OPERATION, common.MPITag.OBJECT and common.MPITag.BUFFER.

unidist.core.backends.mpi.core.communication.recv_serialized_data(comm, source_rank)

Receive serialized data buffer.

The data is de-serialized with unidist.core.backends.mpi.core.SimpleDataSerializer.

Parameters:
  • comm (object) – MPI communicator object.

  • source_rank (int) – Source MPI process to receive data from.

Returns:

Received de-serialized data object from another MPI process.

Return type:

object

Notes

The special tags are used for this communication, namely, common.MPITag.OBJECT and common.MPITag.BUFFER.

To reduce possible contention, MPI communication module supports custom receive data functions with a busy-wait loop underneath.

unidist.core.backends.mpi.core.communication.mpi_busy_wait_recv(comm, source_rank)

Wait for receive operation result in a custom busy wait loop.

Parameters:
  • comm (object) – MPI communicator object.

  • source_rank (int) – Source MPI process to receive data.

Returns:

Received data.

Return type:

object

Notes

The special tag is used for this communication, namely, common.MPITag.OBJECT.

unidist.core.backends.mpi.core.communication.mpi_recv_operation(comm)

Worker receive operation type interface.

Busy waits to avoid contention. Receives data from any source.

Parameters:

comm (object) – MPI communicator object.

Returns:

  • unidist.core.backends.mpi.core.common.Operation – Operation type.

  • int – Source rank.

Notes

The special tag is used for this communication, namely, common.MPITag.OPERATION.