2#include <boost/asio.hpp>
3#include <boost/asio/system_timer.hpp>
22namespace asio = boost::asio;
26 std::string networkName,
27 std::string addressDirectory)
35 _addressDirectory =
".";
58 std::string
const &requesterName,
59 std::string
const &tag,
80 acceptor.open(endpoint.protocol());
81 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
82 acceptor.bind(endpoint);
86 address = ipAddress +
":" + std::to_string(
_portNumber);
88 conInfo.
write(address);
93 int requesterCommunicatorSize = -1;
96 auto socket = std::make_shared<Socket>(*
_ioContext);
98 acceptor.accept(*socket);
99 boost::asio::ip::tcp::no_delay option(
true);
100 socket->set_option(option);
105 int requesterRank = -1;
107 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
110 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
112 _sockets[requesterRank] = std::move(socket);
116 auto adjustedRequesterRank = requesterRank + rankOffset;
117 send(acceptorRank, adjustedRequesterRank);
118 receive(requesterCommunicatorSize, adjustedRequesterRank);
121 if (peerCurrent == 0) {
122 peerCount = requesterCommunicatorSize;
126 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
128 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
129 }
while (++peerCurrent < requesterCommunicatorSize);
132 }
catch (std::exception &e) {
133 PRECICE_ERROR(
"Accepting a socket connection at {} failed with the system error: {}", address, e.what());
137 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(
_ioContext->get_executor()));
142 std::string
const &requesterName,
143 std::string
const &tag,
145 int requesterCommunicatorSize)
147 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
148 PRECICE_ASSERT(requesterCommunicatorSize >= 0,
"Requester communicator size has to be positive.");
151 if (requesterCommunicatorSize == 0) {
169 acceptor.open(endpoint.protocol());
170 acceptor.set_option(tcp::acceptor::reuse_address(
_reuseAddress));
171 acceptor.bind(endpoint);
177 address = ipAddress +
":" + std::to_string(
_portNumber);
179 conInfo.
write(address);
183 for (
int connection = 0; connection < requesterCommunicatorSize; ++connection) {
184 auto socket = std::make_shared<Socket>(*
_ioContext);
185 acceptor.accept(*socket);
186 boost::asio::ip::tcp::no_delay option(
true);
187 socket->set_option(option);
192 asio::read(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
193 _sockets[requesterRank] = std::move(socket);
197 }
catch (std::exception &e) {
198 PRECICE_ERROR(
"Accepting a socket connection at {} failed with the system error: {}", address, e.what());
202 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(
_ioContext->get_executor()));
207 std::string
const &requesterName,
208 std::string
const &tag,
210 int requesterCommunicatorSize)
216 std::string
const address = conInfo.
read();
218 auto const sepidx = address.find(
':');
219 std::string
const ipAddress = address.substr(0, sepidx);
220 std::string
const portNumber = address.substr(sepidx + 1);
221 _portNumber =
static_cast<unsigned short>(std::stoul(portNumber));
224 auto socket = std::make_shared<Socket>(*
_ioContext);
230 auto results = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
232 auto endpoint = results.begin()->endpoint();
233 boost::system::error_code error = asio::error::host_not_found;
234 socket->connect(endpoint, error);
241 boost::asio::system_timer timer(*
_ioContext, std::chrono::milliseconds(1));
245 boost::asio::ip::tcp::no_delay option(
true);
246 socket->set_option(option);
250 asio::write(*socket, asio::buffer(&requesterRank,
sizeof(
int)));
252 int acceptorRank = -1;
253 asio::read(*socket, asio::buffer(&acceptorRank,
sizeof(
int)));
256 send(requesterCommunicatorSize, 0);
258 }
catch (std::exception &e) {
259 PRECICE_ERROR(
"Requesting a socket connection at {} failed with the system error: {}", address, e.what());
263 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(
_ioContext->get_executor()));
268 std::string
const &requesterName,
269 std::string
const &tag,
270 std::set<int>
const &acceptorRanks,
274 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
277 for (
auto const &acceptorRank : acceptorRanks) {
280 std::string
const address = conInfo.
read();
281 auto const sepidx = address.find(
':');
282 std::string
const ipAddress = address.substr(0, sepidx);
283 std::string
const portNumber = address.substr(sepidx + 1);
284 _portNumber =
static_cast<unsigned short>(std::stoul(portNumber));
287 auto socket = std::make_shared<Socket>(*
_ioContext);
291 PRECICE_DEBUG(
"Requesting connection to {}, port {}", ipAddress, portNumber);
295 auto endpoints = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
297 boost::system::error_code error = asio::error::host_not_found;
298 boost::asio::connect(*socket, endpoints, error);
305 boost::asio::system_timer timer(*
_ioContext, std::chrono::milliseconds(1));
309 boost::asio::ip::tcp::no_delay option(
true);
310 socket->set_option(option);
312 PRECICE_DEBUG(
"Requested connection to {}, rank = {}", address, acceptorRank);
313 _sockets[acceptorRank] = std::move(socket);
314 send(requesterRank, acceptorRank);
316 }
catch (std::exception &e) {
317 PRECICE_ERROR(
"Requesting a socket connection at {} failed with the system error: {}", address, e.what());
321 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(
_ioContext->get_executor()));
342 socket.second->shutdown(Socket::shutdown_send);
343 socket.second->close();
344 }
catch (std::exception &e) {
345 PRECICE_WARN(
"Socket shutdown failed with system error: {}", e.what());
361 size_t size = itemToSend.size() + 1;
363 asio::write(*
_sockets[rankReceiver], asio::buffer(&size,
sizeof(
size_t)));
364 asio::write(*
_sockets[rankReceiver], asio::buffer(itemToSend.c_str(), size));
365 }
catch (std::exception &e) {
366 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
380 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)));
381 }
catch (std::exception &e) {
382 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
387 std::string
const &requesterName)
389 using namespace std::filesystem;
391 PRECICE_DEBUG(
"Creating connection exchange directory {}", dir.generic_string());
393 create_directories(dir);
394 }
catch (
const std::filesystem::filesystem_error &e) {
395 PRECICE_WARN(
"Creating directory for connection info failed with filesystem error: {}", e.what());
400 std::string
const &requesterName)
402 using namespace std::filesystem;
404 PRECICE_DEBUG(
"Removing connection exchange directory {}", dir.generic_string());
407 }
catch (
const std::filesystem::filesystem_error &e) {
408 PRECICE_WARN(
"Cleaning up connection info failed with filesystem error {}", e.what());
424 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
int)),
426 std::static_pointer_cast<SocketRequest>(request)->complete();
441 asio::write(*
_sockets[rankReceiver], asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)));
442 }
catch (std::exception &e) {
443 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
459 asio::buffer(itemsToSend.
data(), itemsToSend.
size() *
sizeof(
double)),
461 std::static_pointer_cast<SocketRequest>(request)->complete();
476 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
double)));
477 }
catch (std::exception &e) {
478 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
497 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
int)));
498 }
catch (std::exception &e) {
499 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
518 asio::write(*
_sockets[rankReceiver], asio::buffer(&itemToSend,
sizeof(
bool)));
519 }
catch (std::exception &e) {
520 PRECICE_ERROR(
"Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
536 asio::buffer(&itemToSend,
sizeof(
bool)),
538 std::static_pointer_cast<SocketRequest>(request)->complete();
555 asio::read(*
_sockets[rankSender], asio::buffer(&size,
sizeof(
size_t)));
556 std::vector<char> msg(size);
557 asio::read(*
_sockets[rankSender], asio::buffer(msg.data(), size));
558 itemToReceive = msg.data();
559 }
catch (std::exception &e) {
560 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
574 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
int)));
575 }
catch (std::exception &e) {
576 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
590 asio::read(*
_sockets[rankSender], asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)));
591 }
catch (std::exception &e) {
592 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
609 asio::async_read(*
_sockets[rankSender],
610 asio::buffer(itemsToReceive.
data(), itemsToReceive.
size() *
sizeof(
double)),
611 [request](boost::system::error_code
const &, std::size_t) {
612 std::static_pointer_cast<SocketRequest>(request)->complete();
614 }
catch (std::exception &e) {
615 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
631 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
double)));
632 }
catch (std::exception &e) {
633 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
652 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
int)));
653 }
catch (std::exception &e) {
654 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
671 asio::async_read(*
_sockets[rankSender],
672 asio::buffer(&itemToReceive,
sizeof(
int)),
673 [request](boost::system::error_code
const &, std::size_t) {
674 std::static_pointer_cast<SocketRequest>(request)->complete();
676 }
catch (std::exception &e) {
677 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
693 asio::read(*
_sockets[rankSender], asio::buffer(&itemToReceive,
sizeof(
bool)));
694 }
catch (std::exception &e) {
695 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
711 asio::async_read(*
_sockets[rankSender],
712 asio::buffer(&itemToReceive,
sizeof(
bool)),
713 [request](boost::system::error_code
const &, std::size_t) {
714 std::static_pointer_cast<SocketRequest>(request)->complete();
716 }
catch (std::exception &e) {
717 PRECICE_ERROR(
"Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
731std::vector<Interface> detectInterfaces()
733 std::vector<Interface> interfaces;
736 struct if_nameindex *nameInterface = if_nameindex();
737 for (
struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
739 interface.index = itNameInterface->if_index;
740 interface.name = itNameInterface->if_name;
741 interfaces.emplace_back(std::move(interface));
743 if_freenameindex(nameInterface);
746 for (
auto &interface : interfaces) {
747 struct ifreq request;
748 strncpy(request.ifr_name,
749 interface.name.c_str(),
752 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
753 if (socketfd == -1) {
756 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
762 const char *addr = inet_ntoa((
reinterpret_cast<struct sockaddr_in *
>(&request.ifr_addr))->sin_addr);
766 interface.address = addr;
784 auto interfaces = detectInterfaces();
786 auto pos = std::find_if(interfaces.begin(), interfaces.end(),
787 [&](Interface
const &interface) { return interface.name == _networkName; });
788 if (pos == interfaces.end()) {
790 std::ostringstream err;
791 err <<
"Cannot find network interface \"" <<
_networkName <<
"\". Available interfaces are: ";
792 for (
const auto &interface : interfaces) {
793 err << interface.name <<
' ';
795 err <<
" Please check \"network\" attributes in your configuration file.";
800 PRECICE_CHECK(not pos->address.empty(),
"The interface \"{}\" does not have an IP address. Please select another interface.",
_networkName);
#define PRECICE_ERROR(...)
#define PRECICE_WARN(...)
#define PRECICE_DEBUG(...)
#define PRECICE_TRACE(...)
#define PRECICE_CHECK(check,...)
#define PRECICE_ASSERT(...)
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
std::string getIpAddress()
std::unique_ptr< WorkGuard > _workGuard
PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
std::shared_ptr< IOContext > _ioContext
void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
boost::asio::io_context IOContext
void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
void closeConnection() override
Disconnects from communication space, i.e. participant.
void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
~SocketCommunication() override
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
A C++ 11 implementation of the non-owning C++20 std::span type.
constexpr pointer data() const noexcept
constexpr size_type size() const noexcept
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
std::shared_ptr< Request > PtrRequest
contains precice-related utilities.
auto refToSpan(T &element)
Wraps a single element into a span.