diff --git a/src/core/mpi/BufferSystem.h b/src/core/mpi/BufferSystem.h index e37c3a76a087a758201ee9a160eb4cda1ef99398..b40fa75f543b71845971d60d82b635b268e8bbfb 100644 --- a/src/core/mpi/BufferSystem.h +++ b/src/core/mpi/BufferSystem.h @@ -233,12 +233,13 @@ protected: internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_; internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_; internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_; + internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeAndSenderCommIProbe_; internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_; internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_ bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating bool communicationRunning_; //< indicates if a communication step is currently running - + bool senderKnown_; //< if false, the sender ranks are unknown before receiving /// Info about the message to be received from a certain rank: /// information holds the buffer and, if known, the message size diff --git a/src/core/mpi/BufferSystem.impl.h b/src/core/mpi/BufferSystem.impl.h index 242df47c80ea7506155e42f4036e6c9d7838b8b2..c7f5656d5297e3d85bcfee8ecb1651b304772465 100644 --- a/src/core/mpi/BufferSystem.impl.h +++ b/src/core/mpi/BufferSystem.impl.h @@ -23,6 +23,7 @@ #include "core/mpi/MPIManager.h" #include "core/debug/CheckFunctions.h" +#include <algorithm> namespace walberla { namespace mpi { @@ -128,10 +129,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, : knownSizeComm_ ( communicator, tag ), unknownSizeComm_( communicator, tag ), unknownSizeCommIProbe_( communicator, tag ), + unknownSizeAndSenderCommIProbe_( communicator, tag, false ), noMPIComm_( communicator, tag ), currentComm_ ( nullptr ), sizeChangesEverytime_( true ), - communicationRunning_( false ) + communicationRunning_( false ), + senderKnown_( true ) { } @@ -140,10 +143,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth : knownSizeComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), + unknownSizeAndSenderCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag(), false ), noMPIComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), currentComm_ ( nullptr ), sizeChangesEverytime_( other.sizeChangesEverytime_ ), communicationRunning_( other.communicationRunning_ ), + senderKnown_( true ), recvInfos_( other.recvInfos_ ), sendInfos_( other.sendInfos_ ) { @@ -154,6 +159,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth currentComm_ = &unknownSizeComm_; else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ ) currentComm_ = &unknownSizeCommIProbe_; + else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ ) + currentComm_ = &unknownSizeAndSenderCommIProbe_; else if ( other.currentComm_ == &other.noMPIComm_ ) currentComm_ = &noMPIComm_; else @@ -167,6 +174,7 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene sizeChangesEverytime_ = other.sizeChangesEverytime_; communicationRunning_ = other.communicationRunning_; + senderKnown_ = other.senderKnown_; recvInfos_ = other.recvInfos_; sendInfos_ = other.sendInfos_; @@ -176,6 +184,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene currentComm_ = &unknownSizeComm_; else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ ) currentComm_ = &unknownSizeCommIProbe_; + else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ ) + currentComm_ = &unknownSizeAndSenderCommIProbe_; else if ( other.currentComm_ == &other.noMPIComm_ ) currentComm_ = &noMPIComm_; else @@ -228,7 +238,20 @@ void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::set<MPIRank> & ran template< typename Rb, typename Sb> void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const int numReceives ) { - WALBERLA_ABORT("NOT IMPLEMENTED!"); + WALBERLA_ASSERT( ! communicationRunning_ ); + + recvInfos_.clear(); + for ( MPIRank sender = 0; sender < numReceives; sender++ ) + { + recvInfos_[ - 1 - sender ].size = INVALID_SIZE; + } + + // "any sender"-communication is only supported via IProbe + useIProbe( true ); + + sizeChangesEverytime_ = true; + senderKnown_ = false; + setCommunicationType( false ); } @@ -526,12 +549,21 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize ) WALBERLA_MPI_SECTION() { - if( knownSize ) - currentComm_ = &knownSizeComm_; - else if ( useIProbe_ ) - currentComm_ = &unknownSizeCommIProbe_; + if ( senderKnown_ ) + { + if (knownSize) + currentComm_ = &knownSizeComm_; + else if (useIProbe_) + currentComm_ = &unknownSizeCommIProbe_; + else + currentComm_ = &unknownSizeComm_; + } else - currentComm_ = &unknownSizeComm_; + { + WALBERLA_CHECK( useIProbe_, "Unknown sender communication is currently only supported with IProbe-based " + "communication." ) + currentComm_ = &unknownSizeAndSenderCommIProbe_; + } } } diff --git a/src/core/mpi/BufferSystemHelper.h b/src/core/mpi/BufferSystemHelper.h index 661e463587263ec62d8aadbc1ba93d15660ca2ed..15405e432d62558c48cf8dcc616cdde64942d921 100644 --- a/src/core/mpi/BufferSystemHelper.h +++ b/src/core/mpi/BufferSystemHelper.h @@ -167,8 +167,9 @@ namespace internal { public: using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo; - UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 ) - : AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {} + UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0, bool senderKnown = true ) + : AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false), + senderKnown_( senderKnown ) {} virtual ~UnknownSizeCommunicationIProbe() {} @@ -184,6 +185,7 @@ namespace internal { bool sending_; bool receiving_; int pendingReceives_; + bool senderKnown_; std::vector<MPI_Request> sendRequests_; }; diff --git a/src/core/mpi/BufferSystemHelper.impl.h b/src/core/mpi/BufferSystemHelper.impl.h index fc2c6a5ae5a2623de6469eb3c6b2c54128927379..93f89606ccd21f4b04f07daf2eee71e30f496d51 100644 --- a/src/core/mpi/BufferSystemHelper.impl.h +++ b/src/core/mpi/BufferSystemHelper.impl.h @@ -352,6 +352,7 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, //====================================================================================================================== // // Unknown Size Communication (IProbe method) +// Can also be used if there are unknown sender ranks. // //====================================================================================================================== @@ -438,15 +439,29 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI if (recvInfo.size != INVALID_SIZE) continue; + const MPIRank iprobeSender = senderKnown_ ? sender : MPI_ANY_SOURCE; + int probeFlag; MPI_Status probeStatus; - MPI_Iprobe( sender, + MPI_Iprobe( iprobeSender, this->tag_, this->communicator_, &probeFlag, &probeStatus); if (probeFlag) { + const MPIRank actualSender = probeStatus.MPI_SOURCE; + + if ( !senderKnown_ ) + { + recvInfos.erase( sender ); + recvInfo = recvInfos[actualSender]; + } + else + { + WALBERLA_ASSERT_EQUAL( sender, actualSender ); + } + int count = 0; MPI_Get_count( &probeStatus, MPI_BYTE, &count ); //WALBERLA_LOG_DEVEL("received " << count << " from " << sender); @@ -457,14 +472,14 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI MPI_Recv( recvInfo.buffer.ptr(), // where to store received size count, // size of expected message MPI_BYTE, // type - sender, // rank of sender process + actualSender, // rank of sender process this->tag_, // message tag this->communicator_, // communicator &recvStatus // request, needed for wait ); --pendingReceives_; - return sender; + return actualSender; } } } @@ -539,6 +554,7 @@ MPIRank NoMPICommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, Receiv + } // namespace internal } // namespace mpi } // namespace walberla