Message Passing Programming学习笔记

发布于2023年11月22日浏览量82

# UoE笔记
# MPP

Message passing concepts

Message Passing Model

  • based on the concept of processes
  • communicate by sending and receiving messages
  • each process can only access its own data
  • parallelism is achieved by cooperate same task using many processes

Sequential paradigm

  • memory and processor formed a process
  • processes interact with the communication network by message passing interface

SPMD

  • most message passing programs use the Single-Program-Multiple-Data model
  • all processes
    • run their own copy the same program
    • use separate copy of data
    • have unique identifier
  • usually run one process per processor/core General message passing
main (int argc, char **argv) { if ( controller_process) { Controller(); } else { Worker(); } }

Messages

  • the only form of communication
    • <font color="#ffc000">all communication is therefore explicit, which means all operations need to be controlled by programmer</font>
  • it transfer data from memory of one process to the memory of another process
  • typically contains
    • sending processor id
    • receiving processor id
    • data items type
    • data items number
    • data
    • a message type identifier

Communication nodes

  • sending message can be synchronous (同步的,等待接收完成)
    • not completed until message has started to be received
    • receives are usually sync (receiving process must wait until the message arrives)
  • or can be asynchronous (异步的,不管接收是否完成)
    • completed as soon as the message gone

MPI

MPI_Init

int main(int argc, char &argv[]) { MPI_Init(&argc, &argv); } // or int main(void) { MPI_Init(NULL, NULL); }

MPI_Comm_rank & MPI_Comm_size

int rank; int size; MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_Comm comm, &size) printf(“Hello from rank %d in size %d”, rank, size);

rank is to identify different processes in a communicator MPI_COMM_WORLD size is the number of processes contained within a communicator

Exiting MPI

it must be the last MPI procedure called. int MPI_Finalize();

Point2point communication

  • in MPI
    1. Sender calls a SEND routine
    2. Receive calls a RECEIVE routine
    3. Data goes into the receive buffer
    4. At the same time, its metadata is received into separate storage

Sender & Receiver

operationMPI call
standard sendMPI_Send
synchronous sendMPI_Ssend
buffered sendMPI_Bsend
receiveMPI_Recv
  • 标准发送(MPI_Send)
    • MPI_Send 是一种非阻塞通信方式,它允许发送方立即继续执行,而不必等待接收方确认接收。这意味着发送方可能会在消息实际被接收之前继续执行。
    • MPI_Send 需要确保接收方已经准备好接收消息。如果接收方尚未准备好,MPI_Send 会等待,直到可以发送。
    • MPI_Send 通常用于发送小量数据,对通信顺序要求不高的情况。
  • 同步发送(MPI_Ssend)
    • MPI_Ssend 是一种同步发送方式,它要求发送方等待接收方确认接收消息,然后才能继续执行。这确保了消息不会在接收方准备好之前被丢弃。
    • MPI_Ssend 适用于需要确保消息按照特定顺序被接收的情况,或者需要强制同步的情况。
  • 缓冲发送(MPI_Bsend): - MPI_Bsend 是一种缓冲发送方式,它允许发送方将消息放入缓冲区,并继续执行,而无需等待接收方确认。 - 与 MPI_Send 和 MPI_Ssend 不同,MPI_Bsend 不会立即阻塞发送方,而是将消息放入缓冲区,稍后由 MPI 系统异步传输消息。 - MPI_Bsend 通常用于发送大量数据,以便减少通信的开销,但需要在适当的时机调用 MPI_Buffer_detach 以确保缓冲区中的消息被发送。 * More details in Modes sending a message int MPI_Ssend(void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm); from rank1 to rank3
int x[10]; MPI_Ssend(x, 10, MPI_INT, 3, 0, MPI_COMM_WORLD); // 3 is Destination, 0 is Tag

receiving a message int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status) from rank1 on rank3

int y[10]; MPI_Status status; MPI_Recv(y, 10, MPI_INT, 1, 0, MPI_COMM_WORLD, &status); // 1 is Source, 0 is Tag
  • for a communication to succeed
    • valid destination rank in sender
    • valid source rank in receiver
    • same communicator
    • same tags
    • same message types
    • receiver's buffer must be large enough
  • wildcarding
    • receiver can wildcard
    • to receive from any source MPI_ANY_SOURCE
    • to receive with any tag MPI_ANY_TAG
    • actual source and tag are returned in the receiver's status parameter. received message count int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count)

Message matching

  • case 1
    • Rank 0: Ssend(msg1, dest=1, tag=1) Ssend(msg2, dest=1, tag=2)
    • Rank 1: Recv(buf1, src=0, tag=1) Recv(buf2, src=0, tag=2)
    • Res:
      • buf1 = msg1; buf2 = msg2
  • case 2
    • Rank 0: Ssend(msg1, dest=1, tag=1) Ssend(msg2, dest=1, tag=2)
    • Rank 1: Recv(buf2, src=0, tag=2) Recv(buf1, src=0, tag=1)
    • Res:
      • synchronous send cause deadlock 由于Ssend是同步的,发送者在发送消息后会等待接收者确认接收消息之前不会继续执行
      • sends and receives incorrectly matched Rank 0 使用Ssend发送msg1msg2,并分别使用标签 1 和标签 2 进行标识。然而,在 Rank 1 中,它首先尝试接收标签为 2 的消息,然后尝试接收标签为 1 的消息。这种操作是不正确的,因为发送和接收的标签不匹配,会导致数据接收错误,或者在某些情况下,接收操作可能会永远等待正确的消息到达
  • case 3
    • Rank 0: Bsend(msg1, dest=1, tag=1) Bsend(msg2, dest=1, tag=1)
    • Rank 1: Recv(buf1, src=0, tag=1) Recv(buf2, src=0, tag=1)
    • Res:
      • buf1 = msg1; buf2 = msg2
      • Messages have same tags but matched in order
  • case 4
    • Rank 0: Bsend(msg1, dest=1, tag=1) Bsend(msg2, dest=1, tag=2)
    • Rank 1: Recv(buf2, src=0, tag=2) Recv(buf1, src=0, tag=1)
    • Res:
      • buf1 = msg1; buf2 = msg2
      • Do not have to receive messages in order!
  • case 5
    • Rank 0: Bsend(msg1, dest=1, tag=1) Bsend(msg2, dest=1, tag=2)
    • Rank 1: Recv(buf1, src=0, tag=MPI_ANY_TAG) Recv(buf2, src=0, tag=MPI_ANY_TAG)
    • Res:
      • buf1 = msg1; buf2 = msg2
      • Messages guaranteed to match in send order need to examine status to find out the actual tag values

Timers

double MPI_Wtime(void)

  • measured in seconds
  • consulting the timer before and after a task

Mode-tag-communicator

Modes

* briefly explained in MPI Sender & Receiver

  • Recv is always sync if Recv was issued before Bsend, it will always wait until Bsend was issued
  • if two Bsend issued before Recv system tries to store message in buffer, but Bsend will fail when space is not enough
  • MPI_Send (do not recommend)
    • it tries to solve the shorthand of Bsend and Send by
      • buffer space is provided by the system
      • normally async like Bsend
      • sync like Ssend when buffer is full
    • it is unlikely to fail
      • but could cause deadlock if buffering runs out An interesting case in MPI_Send
  • Process A:
    • Send(x, B):进程 A 向进程 B 发送消息 x。
    • Recv(x, B):然后,进程 A 尝试从进程 B 接收消息 x。
  • Process B:
    • Send(y, A):进程 B 向进程 A 发送消息 y。
    • Recv(y, A):然后,进程 B 尝试从进程 A 接收消息 y。
  • Deadlock:
    1. 进程 A 执行 Send(x, B),将消息 x 发送到 进程 B
    2. 进程 B 执行 Send(y, A),将消息 y 发送到 进程 A
    3. 进程 A 试图执行 Recv(x, B),但它无法继续,因为消息 x 尚未被 进程 B 接收。
    4. 同样,进程 B 试图执行 Recv(y, A),但它无法继续,因为消息 y 尚未被 进程 A 接收。
    5. 此时,两个进程都在等待对方接收消息,它们陷入了相互等待的状态,这就是死锁的情况。进程 A 在等待 进程 B 接收消息 x,而 进程 B 在等待进程 A 接收消息 y。由于它们互相阻塞,无法前进,程序无法继续执行。
  • Success:
    1. 进程 A 执行 Send(x, B),将消息 x 发送到 进程 B
    2. 进程 B 执行 Recv(x, A),接收从 进程 A 发来的消息 x。
    3. 进程 B 执行 Send(y, A),将消息 y 发送到 进程 A
    4. 进程 A 执行 Recv(y, B),接收从 进程 B 发来的消息 y。
    5. 在这个操作顺序下,每个接收操作都在对应的发送操作之后,确保了正确的同步。消息将以正确的顺序传递,避免了死锁。
  • Solutions to avoid deadlock
    • either match sends and receives explicitly
      • A sends then receives
      • B receives then sends
    • use non-blocking communications, see non-blocking
      • do not use MPI_Send in this course, use MPI_Ssend

Communicators

  • all MPI communications take place within a communicator
    • communicator - a group of processes
  • message can only be received within the same communicator from which it was sent
  • can be split into pieces by MPI_Comm_split()
    • each process has a new rank within each sub-communicator
    • guarantee messages from different pieces do not interact
  • can make copy by MPI_Comm_dup
    • containing same processes but in a new communicator

Non-blocking Communications

non-blocking operations

  • all non-blocking operations should have matching wait operations some system cannot free resources until wait has been called
  • if immediately followed by a matching wait is equal to blocking operation
  • operation continues after the call has returned 非阻塞操作和传统的顺序子例程调用是不同的。在非阻塞操作中,调用一个非阻塞操作后,该操作会立即返回,允许程序继续执行其他任务,而不必等待操作完成。这与传统的顺序子例程调用不同,传统调用会一直等待子例程执行完成后才返回
  • can be separate into three phases:
    1. initiate non-blocking communication
    2. do some work
    3. wait for non-blocking communication to complete
  • a request handle is allocated when a communication is initiated non-blocking synchronous send
int MPI_Issend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) int MPI_Wait(MPI_Request *request, MPI_Status *status)

non-blocking receive

int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request) int MPI_Wait(MPI_Request *request, MPI_Status *status)

blocking & non-blocking

  • send and receive can be blocking or non-blocking
  • a blocking send can be used with a non-blocking receive, and vice-versa
  • non-blocking sends can use any mode - synchronous, buffered or standard

non-blocking modes

non-blocking operationMPI call
Standard sendMPI_Isend
Sync sendMPI_Issend
buffered sendMPI_Ibsend
receiveMPI_Irecv
example
MPI_Request request; MPI_Status status; if (rank == 0) { MPI_Issend(sendarray, 10, MPI_INT, 1, tag, MPI_COMM_WORLD, &request); Do_something_else_while Issend_happens(); // now wait for send to complete MPI_Wait(&request, &status); } else if (rank == 1) { MPI_Irecv(recvarray, 10, MPI_INT, 0, tag, MPI_COMM_WORLD, &request); Do_something_else_while Irecv_happens(); // now wait for receive to complete MPI_Wait(&request, &status); }

important notes

  • synchronous mode affects what completion means
    • after a wait on MPI_Issend, you know the routine has completed
    • after a wait on MPI_Isend, you know the routine has completed
    • the request can be re-used after a wait
  • You must not access send or receive buffers until communications are complete
    • cannot overwrite send buffer until after a wait on Issend / Isend
    • cannot read from a receive buffer until after a wait on Irecv

testing multiple non-blocking comms

see slide L07

Collective communication

characteristics

  • collective action over a communicator
  • all processes must communicate
  • synchronisation may or may not occur
  • standard collective operations are blocking
  • no tags
  • receive buffer must be exactly the right size

comms

  • barrier synchronisation
    • int MPI_Barrier(MPI_Comm comm)
    • 用于创建一个同步点,所有进程在到达这一点之前将等待。一旦所有进程都到达栅栏,它们将继续执行
  • broadcast
    • int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm)
    • 用于广播一个进程的数据给所有其他进程。一个进程将数据发送到所有其他进程,使得所有进程在操作结束后拥有相同的数据
  • scatter
    • int MPI_Scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
    • 用于将一个进程中的数据分发给多个其他进程。一个进程将数据分割并发送给多个目标进程
    • vector version (MPI_Scatterv())
      • 用于将不定数量的数据元素从一个进程分发给多个进程。每个接收进程可以接收不同数量的数据元素,这些数量在一个数组中指定
      • 参数包括发送缓冲区、发送计数、发送数据类型、接收缓冲区、接收计数、接收数据类型、根进程和通信组
      • 这允许你分发不同数量的数据元素给不同的接收进程,而不仅仅是均匀分发相同数量的元素
  • gather
    • int MPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
    • 用于从多个进程中收集数据到一个进程中。多个进程发送数据给一个接收进程,该接收进程组合这些数据以形成一个聚合数据集
    • vector version (MPI_Gatherv())
      • 用于从多个进程中收集不定数量的数据元素到一个接收缓冲区中。每个进程可以贡献不同数量的数据元素,这些数量在一个数组中指定
      • 参数包括发送缓冲区、发送计数、发送数据类型、接收缓冲区、接收计数、接收数据类型、根进程和通信组
      • 这允许你收集不同长度的数据元素,而不仅仅是相同数量的元素
  • reduction
    • `int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)
    • `MPI_Reduce(&x, &result, 1, MPI_INT, MPI_SUM,0, MPI_COMM_WORLD) // Integer global sum example
      • the result is only placed there on process 0
      • sum of all the x values is placed in result
    • 用于将多个进程的数据汇总到一个进程中。通常,数据在各个进程中进行某种操作(如求和、求积、最大值、最小值等),然后结果在一个进程中汇总
  • user-defined reduction
    • steps:
      1. 定义归约操作:首先,你需要定义一个自定义的归约操作,该操作描述了如何将两个值合并成一个值。这通常涉及到提供一个用户定义的函数来执行合并操作。
      2. 注册归约操作:然后,你需要使用 MPI 的MPI_Op_create函数注册自定义的归约操作,将其绑定到一个操作句柄。
      3. 执行归约操作:最后,你可以在集体通信中使用自定义的归约操作来合并数据。通常,这涉及到使用MPI_ReduceMPI_Allreduce等标准集体通信操作,并将自定义的归约操作传递给这些函数。
    • details in slide L08
  • all reduction (<font color="#ffc000">efficient</font>)
    • int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
    • MPI_Allreduce(&x, &result, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD) // Integer global sum example
      • sum of all the x values is placed in result
      • the result is stored on every process
  • scan
    • int MPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
    • 逐步扫描操作对一个数组中的元素进行累积计算,并将计算的结果分布给所有进程。MPI_Scan操作允许多个进程在一个分布式环境中进行并行计算,其中每个进程执行一个部分的累积操作
    • Scan is inclusive
      • includes the result on calling process
      • for exclusive scan use MPI_Exscan()

Virtual Topologies

Cartesian

  • each process is “connected” to its neighbours in a virtual grid
    • boundaries can be cyclic, or not.
    • optionally re-order ranks to allow MPI implementation to optimise for underlying network interconnectivity
  • processes are identified by cartesian coordinates create a cartesian virtual topology int MPI_Cart_create(MPI_Comm comm_old, int ndims, int *dims, int *periods, int reorder, MPI_Comm *comm_cart) balanced processor distribution int MPI_Dims_create( int nnodes, int ndims, int *dims ) mapping functions
  • int MPI_Cart_rank( MPI_Comm comm, int *coords, int *rank) // Mapping process grid coordinates to ranks
  • int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int *coords) // Mapping ranks to process grid coordinates
  • int MPI_Cart_shift(MPI_Comm comm, int direction, int disp, int *rank_source, int *rank_dest) // Computing ranks of my neighbouring processes Following conventions of MPI_SendRecv
    • rank_source is not your rank! it is an output not an input
    • For message round a ring
      • rank_source would be rank - 1
      • rank_dest would be rank + 1
    • Different convention to MPI_Cart_coords()
      • your are implicitly asking for your neighbours Partitioning int MPI_Cart_sub ( MPI_Comm comm, int *remain_dims, MPI_Comm *newcomm)

My Nocturzone

LEON の 熬夜空间