preCICE
Loading...
Searching...
No Matches
SocketCommunication.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <boost/asio.hpp>
3#include <boost/asio/system_timer.hpp>
4
5#include <chrono>
6#include <filesystem>
7#include <sstream>
8#include <stdexcept>
9#include <utility>
10
13#include "SocketRequest.hpp"
14#include "logging/LogMacros.hpp"
16#include "utils/assertion.hpp"
17#include "utils/networking.hpp"
18#include "utils/span_tools.hpp"
19
20namespace precice::com {
21
22namespace asio = boost::asio;
23
25 bool reuseAddress,
26 std::string networkName,
27 std::string addressDirectory)
28 : _portNumber(portNumber),
29 _reuseAddress(reuseAddress),
30 _networkName(std::move(networkName)),
31 _addressDirectory(std::move(addressDirectory)),
33{
34 if (_addressDirectory.empty()) {
35 _addressDirectory = ".";
36 }
37}
38
39SocketCommunication::SocketCommunication(std::string const &addressDirectory)
40 : SocketCommunication(0, false, utils::networking::loopbackInterfaceName(), addressDirectory)
41{
42}
43
49
56
57void SocketCommunication::acceptConnection(std::string const &acceptorName,
58 std::string const &requesterName,
59 std::string const &tag,
60 int acceptorRank,
61 int rankOffset)
62{
63 PRECICE_TRACE(acceptorName, requesterName);
64
66
67 setRankOffset(rankOffset);
68
69 std::string address;
70
71 try {
72 std::string ipAddress = getIpAddress();
73 PRECICE_CHECK(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
74
75 using asio::ip::tcp;
76
77 tcp::acceptor acceptor(*_ioContext);
78 tcp::endpoint endpoint(tcp::v4(), _portNumber);
79
80 acceptor.open(endpoint.protocol());
81 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
82 acceptor.bind(endpoint);
83 acceptor.listen();
84
85 _portNumber = acceptor.local_endpoint().port();
86 address = ipAddress + ":" + std::to_string(_portNumber);
87 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, _addressDirectory);
88 conInfo.write(address);
89 PRECICE_DEBUG("Accept connection at {}", address);
90
91 int peerCurrent = 0; // Current peer to connect to
92 int peerCount = -1; // The total count of peers (initialized in the first iteration)
93 int requesterCommunicatorSize = -1;
94
95 do {
96 auto socket = std::make_shared<Socket>(*_ioContext);
97
98 acceptor.accept(*socket);
99 boost::asio::ip::tcp::no_delay option(true);
100 socket->set_option(option);
101
102 PRECICE_DEBUG("Accepted connection at {}", address);
103 _isConnected = true;
104
105 int requesterRank = -1;
106
107 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
108
109 PRECICE_ASSERT(_sockets.count(requesterRank) == 0,
110 "Rank {} has already been connected. Duplicate requests are not allowed.", requesterRank);
111
112 _sockets[requesterRank] = std::move(socket);
113 // send and receive expect a rank from the acceptor perspective.
114 // Thus we need to apply given rankOffset before passing it to send/receive.
115 // This is essentially the inverse of adjustRank().
116 auto adjustedRequesterRank = requesterRank + rankOffset;
117 send(acceptorRank, adjustedRequesterRank);
118 receive(requesterCommunicatorSize, adjustedRequesterRank);
119
120 // Initialize the count of peers to connect to
121 if (peerCurrent == 0) {
122 peerCount = requesterCommunicatorSize;
123 }
124
125 PRECICE_ASSERT(requesterCommunicatorSize > 0,
126 "Requester communicator size is {} which is invalid.", requesterCommunicatorSize);
127 PRECICE_ASSERT(requesterCommunicatorSize == peerCount,
128 "Current requester size from rank {} is {} but should be {}", requesterRank, requesterCommunicatorSize, peerCount);
129 } while (++peerCurrent < requesterCommunicatorSize);
130
131 acceptor.close();
132 } catch (std::exception &e) {
133 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
134 }
135
136 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
137 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
138 _thread = std::thread([this] { _ioContext->run(); });
139}
140
141void SocketCommunication::acceptConnectionAsServer(std::string const &acceptorName,
142 std::string const &requesterName,
143 std::string const &tag,
144 int acceptorRank,
145 int requesterCommunicatorSize)
146{
147 PRECICE_TRACE(acceptorName, requesterName, acceptorRank, requesterCommunicatorSize);
148 PRECICE_ASSERT(requesterCommunicatorSize >= 0, "Requester communicator size has to be positive.");
150
151 if (requesterCommunicatorSize == 0) {
152 PRECICE_DEBUG("Accepting no connections.");
153 _isConnected = true;
154 return;
155 }
156
157 std::string address;
158
159 try {
160 std::string ipAddress = getIpAddress();
161 PRECICE_ASSERT(not ipAddress.empty(), "Network \"{}\" not found for socket connection!", _networkName);
162
163 using asio::ip::tcp;
164
165 tcp::acceptor acceptor(*_ioContext);
166 {
167 tcp::endpoint endpoint(tcp::v4(), _portNumber);
168
169 acceptor.open(endpoint.protocol());
170 acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
171 acceptor.bind(endpoint);
172 acceptor.listen();
173
174 _portNumber = acceptor.local_endpoint().port();
175 }
176
177 address = ipAddress + ":" + std::to_string(_portNumber);
178 ConnectionInfoWriter conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
179 conInfo.write(address);
180
181 PRECICE_DEBUG("Accepting connection at {}", address);
182
183 for (int connection = 0; connection < requesterCommunicatorSize; ++connection) {
184 auto socket = std::make_shared<Socket>(*_ioContext);
185 acceptor.accept(*socket);
186 boost::asio::ip::tcp::no_delay option(true);
187 socket->set_option(option);
188 PRECICE_DEBUG("Accepted connection at {}", address);
189 _isConnected = true;
190
191 int requesterRank;
192 asio::read(*socket, asio::buffer(&requesterRank, sizeof(int)));
193 _sockets[requesterRank] = std::move(socket);
194 }
195
196 acceptor.close();
197 } catch (std::exception &e) {
198 PRECICE_ERROR("Accepting a socket connection at {} failed with the system error: {}", address, e.what());
199 }
200
201 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
202 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
203 _thread = std::thread([this] { _ioContext->run(); });
204}
205
206void SocketCommunication::requestConnection(std::string const &acceptorName,
207 std::string const &requesterName,
208 std::string const &tag,
209 int requesterRank,
210 int requesterCommunicatorSize)
211{
212 PRECICE_TRACE(acceptorName, requesterName);
214
215 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, _addressDirectory);
216 std::string const address = conInfo.read();
217 PRECICE_DEBUG("Request connection to {}", address);
218 auto const sepidx = address.find(':');
219 std::string const ipAddress = address.substr(0, sepidx);
220 std::string const portNumber = address.substr(sepidx + 1);
221 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
222
223 try {
224 auto socket = std::make_shared<Socket>(*_ioContext);
225
226 using asio::ip::tcp;
227
228 while (not isConnected()) {
229 tcp::resolver resolver(*_ioContext);
230 auto results = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
231
232 auto endpoint = results.begin()->endpoint();
233 boost::system::error_code error = asio::error::host_not_found;
234 socket->connect(endpoint, error);
235
236 _isConnected = not error;
237
238 if (not isConnected()) {
239 // Wait a little, since after a couple of ten-thousand trials the system
240 // seems to get confused and the requester connects wrongly to itself.
241 boost::asio::system_timer timer(*_ioContext, std::chrono::milliseconds(1));
242 timer.wait();
243 }
244 }
245 boost::asio::ip::tcp::no_delay option(true);
246 socket->set_option(option);
247
248 PRECICE_DEBUG("Requested connection to {}", address);
249
250 asio::write(*socket, asio::buffer(&requesterRank, sizeof(int)));
251
252 int acceptorRank = -1;
253 asio::read(*socket, asio::buffer(&acceptorRank, sizeof(int)));
254 _sockets[0] = std::move(socket); // should be acceptorRank instead of 0, likewise all communication below
255
256 send(requesterCommunicatorSize, 0);
257
258 } catch (std::exception &e) {
259 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
260 }
261
262 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
263 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
264 _thread = std::thread([this] { _ioContext->run(); });
265}
266
267void SocketCommunication::requestConnectionAsClient(std::string const &acceptorName,
268 std::string const &requesterName,
269 std::string const &tag,
270 std::set<int> const &acceptorRanks,
271 int requesterRank)
272
273{
274 PRECICE_TRACE(acceptorName, requesterName, acceptorRanks, requesterRank);
276
277 for (auto const &acceptorRank : acceptorRanks) {
278 _isConnected = false;
279 ConnectionInfoReader conInfo(acceptorName, requesterName, tag, acceptorRank, _addressDirectory);
280 std::string const address = conInfo.read();
281 auto const sepidx = address.find(':');
282 std::string const ipAddress = address.substr(0, sepidx);
283 std::string const portNumber = address.substr(sepidx + 1);
284 _portNumber = static_cast<unsigned short>(std::stoul(portNumber));
285
286 try {
287 auto socket = std::make_shared<Socket>(*_ioContext);
288
289 using asio::ip::tcp;
290
291 PRECICE_DEBUG("Requesting connection to {}, port {}", ipAddress, portNumber);
292
293 while (not isConnected()) {
294 tcp::resolver resolver(*_ioContext);
295 auto endpoints = resolver.resolve(ipAddress, portNumber, boost::asio::ip::resolver_base::numeric_host);
296
297 boost::system::error_code error = asio::error::host_not_found;
298 boost::asio::connect(*socket, endpoints, error);
299
300 _isConnected = not error;
301
302 if (not isConnected()) {
303 // Wait a little, since after a couple of ten-thousand trials the system
304 // seems to get confused and the requester connects wrongly to itself.
305 boost::asio::system_timer timer(*_ioContext, std::chrono::milliseconds(1));
306 timer.wait();
307 }
308 }
309 boost::asio::ip::tcp::no_delay option(true);
310 socket->set_option(option);
311
312 PRECICE_DEBUG("Requested connection to {}, rank = {}", address, acceptorRank);
313 _sockets[acceptorRank] = std::move(socket);
314 send(requesterRank, acceptorRank); // send my rank
315
316 } catch (std::exception &e) {
317 PRECICE_ERROR("Requesting a socket connection at {} failed with the system error: {}", address, e.what());
318 }
319 }
320 // NOTE: Keep IO context running so that it fires asynchronous handlers from another thread.
321 _workGuard = std::make_unique<WorkGuard>(boost::asio::make_work_guard(_ioContext->get_executor()));
322 _thread = std::thread([this] { _ioContext->run(); });
323}
324
326{
328
329 if (not isConnected())
330 return;
331
332 if (_thread.joinable()) {
333 _workGuard.reset();
334 _ioContext->stop();
335 _thread.join();
336 }
337
338 for (auto &socket : _sockets) {
339 PRECICE_ASSERT(socket.second->is_open());
340
341 try {
342 socket.second->shutdown(Socket::shutdown_send);
343 socket.second->close();
344 } catch (std::exception &e) {
345 PRECICE_WARN("Socket shutdown failed with system error: {}", e.what());
346 }
347 }
348
349 _isConnected = false;
350}
351
352void SocketCommunication::send(std::string const &itemToSend, Rank rankReceiver)
353{
354 PRECICE_TRACE(itemToSend, rankReceiver);
355
356 rankReceiver = adjustRank(rankReceiver);
357
358 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
360
361 size_t size = itemToSend.size() + 1;
362 try {
363 asio::write(*_sockets[rankReceiver], asio::buffer(&size, sizeof(size_t)));
364 asio::write(*_sockets[rankReceiver], asio::buffer(itemToSend.c_str(), size));
365 } catch (std::exception &e) {
366 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
367 }
368}
369
371{
372 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
373
374 rankReceiver = adjustRank(rankReceiver);
375
376 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
378
379 try {
380 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)));
381 } catch (std::exception &e) {
382 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
383 }
384}
385
386void SocketCommunication::prepareEstablishment(std::string const &acceptorName,
387 std::string const &requesterName)
388{
389 using namespace std::filesystem;
390 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
391 PRECICE_DEBUG("Creating connection exchange directory {}", dir.generic_string());
392 try {
393 create_directories(dir);
394 } catch (const std::filesystem::filesystem_error &e) {
395 PRECICE_WARN("Creating directory for connection info failed with filesystem error: {}", e.what());
396 }
397}
398
399void SocketCommunication::cleanupEstablishment(std::string const &acceptorName,
400 std::string const &requesterName)
401{
402 using namespace std::filesystem;
403 path dir = com::impl::localDirectory(acceptorName, requesterName, _addressDirectory);
404 PRECICE_DEBUG("Removing connection exchange directory {}", dir.generic_string());
405 try {
406 remove_all(dir);
407 } catch (const std::filesystem::filesystem_error &e) {
408 PRECICE_WARN("Cleaning up connection info failed with filesystem error {}", e.what());
409 }
410}
411
413{
414 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
415
416 rankReceiver = adjustRank(rankReceiver);
417
418 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
420
421 PtrRequest request(new SocketRequest);
422
423 _queue.dispatch(_sockets[rankReceiver],
424 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(int)),
425 [request] {
426 std::static_pointer_cast<SocketRequest>(request)->complete();
427 });
428 return request;
429}
430
432{
433 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
434
435 rankReceiver = adjustRank(rankReceiver);
436
437 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
439
440 try {
441 asio::write(*_sockets[rankReceiver], asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)));
442 } catch (std::exception &e) {
443 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
444 }
445}
446
448{
449 PRECICE_TRACE(itemsToSend.size(), rankReceiver);
450
451 rankReceiver = adjustRank(rankReceiver);
452
453 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
455
456 PtrRequest request(new SocketRequest);
457
458 _queue.dispatch(_sockets[rankReceiver],
459 asio::buffer(itemsToSend.data(), itemsToSend.size() * sizeof(double)),
460 [request] {
461 std::static_pointer_cast<SocketRequest>(request)->complete();
462 });
463 return request;
464}
465
466void SocketCommunication::send(double itemToSend, Rank rankReceiver)
467{
468 PRECICE_TRACE(itemToSend, rankReceiver);
469
470 rankReceiver = adjustRank(rankReceiver);
471
472 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
474
475 try {
476 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(double)));
477 } catch (std::exception &e) {
478 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
479 }
480}
481
482PtrRequest SocketCommunication::aSend(const double &itemToSend, Rank rankReceiver)
483{
484 return aSend(precice::refToSpan<const double>(itemToSend), rankReceiver);
485}
486
487void SocketCommunication::send(int itemToSend, Rank rankReceiver)
488{
489 PRECICE_TRACE(itemToSend, rankReceiver);
490
491 rankReceiver = adjustRank(rankReceiver);
492
493 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
495
496 try {
497 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(int)));
498 } catch (std::exception &e) {
499 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
500 }
501}
502
503PtrRequest SocketCommunication::aSend(const int &itemToSend, Rank rankReceiver)
504{
505 return aSend(precice::refToSpan<const int>(itemToSend), rankReceiver);
506}
507
508void SocketCommunication::send(bool itemToSend, Rank rankReceiver)
509{
510 PRECICE_TRACE(itemToSend, rankReceiver);
511
512 rankReceiver = adjustRank(rankReceiver);
513
514 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
516
517 try {
518 asio::write(*_sockets[rankReceiver], asio::buffer(&itemToSend, sizeof(bool)));
519 } catch (std::exception &e) {
520 PRECICE_ERROR("Sending data to another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
521 }
522}
523
524PtrRequest SocketCommunication::aSend(const bool &itemToSend, Rank rankReceiver)
525{
526 PRECICE_TRACE(rankReceiver);
527
528 rankReceiver = adjustRank(rankReceiver);
529
530 PRECICE_ASSERT(rankReceiver >= 0, rankReceiver);
532
533 PtrRequest request(new SocketRequest);
534
535 _queue.dispatch(_sockets[rankReceiver],
536 asio::buffer(&itemToSend, sizeof(bool)),
537 [request] {
538 std::static_pointer_cast<SocketRequest>(request)->complete();
539 });
540 return request;
541}
542
543void SocketCommunication::receive(std::string &itemToReceive, Rank rankSender)
544{
545 PRECICE_TRACE(rankSender);
546
547 rankSender = adjustRank(rankSender);
548
549 PRECICE_ASSERT(rankSender >= 0, rankSender);
551
552 size_t size = 0;
553
554 try {
555 asio::read(*_sockets[rankSender], asio::buffer(&size, sizeof(size_t)));
556 std::vector<char> msg(size);
557 asio::read(*_sockets[rankSender], asio::buffer(msg.data(), size));
558 itemToReceive = msg.data();
559 } catch (std::exception &e) {
560 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
561 }
562}
563
565{
566 PRECICE_TRACE(itemsToReceive.size(), rankSender);
567
568 rankSender = adjustRank(rankSender);
569
570 PRECICE_ASSERT(rankSender >= 0, rankSender);
572
573 try {
574 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(int)));
575 } catch (std::exception &e) {
576 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
577 }
578}
579
581{
582 PRECICE_TRACE(itemsToReceive.size(), rankSender);
583
584 rankSender = adjustRank(rankSender);
585
586 PRECICE_ASSERT(rankSender >= 0, rankSender);
588
589 try {
590 asio::read(*_sockets[rankSender], asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)));
591 } catch (std::exception &e) {
592 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
593 }
594}
595
597 int rankSender)
598{
599 PRECICE_TRACE(itemsToReceive.size(), rankSender);
600
601 rankSender = adjustRank(rankSender);
602
603 PRECICE_ASSERT(rankSender >= 0, rankSender);
605
606 PtrRequest request(new SocketRequest);
607
608 try {
609 asio::async_read(*_sockets[rankSender],
610 asio::buffer(itemsToReceive.data(), itemsToReceive.size() * sizeof(double)),
611 [request](boost::system::error_code const &, std::size_t) {
612 std::static_pointer_cast<SocketRequest>(request)->complete();
613 });
614 } catch (std::exception &e) {
615 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
616 }
617
618 return request;
619}
620
621void SocketCommunication::receive(double &itemToReceive, Rank rankSender)
622{
623 PRECICE_TRACE(rankSender);
624
625 rankSender = adjustRank(rankSender);
626
627 PRECICE_ASSERT(rankSender >= 0, rankSender);
629
630 try {
631 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(double)));
632 } catch (std::exception &e) {
633 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
634 }
635}
636
637PtrRequest SocketCommunication::aReceive(double &itemToReceive, Rank rankSender)
638{
639 return aReceive(precice::refToSpan<double>(itemToReceive), rankSender);
640}
641
642void SocketCommunication::receive(int &itemToReceive, Rank rankSender)
643{
644 PRECICE_TRACE(rankSender);
645
646 rankSender = adjustRank(rankSender);
647
648 PRECICE_ASSERT(rankSender >= 0, rankSender);
650
651 try {
652 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(int)));
653 } catch (std::exception &e) {
654 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
655 }
656}
657
658PtrRequest SocketCommunication::aReceive(int &itemToReceive, Rank rankSender)
659{
660 PRECICE_TRACE(rankSender);
661
662 rankSender = adjustRank(rankSender);
663
664 PRECICE_ASSERT((rankSender >= 0) && (rankSender < (int) _sockets.size()),
665 rankSender, _sockets.size());
667
668 PtrRequest request(new SocketRequest);
669
670 try {
671 asio::async_read(*_sockets[rankSender],
672 asio::buffer(&itemToReceive, sizeof(int)),
673 [request](boost::system::error_code const &, std::size_t) {
674 std::static_pointer_cast<SocketRequest>(request)->complete();
675 });
676 } catch (std::exception &e) {
677 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
678 }
679
680 return request;
681}
682
683void SocketCommunication::receive(bool &itemToReceive, Rank rankSender)
684{
685 PRECICE_TRACE(rankSender);
686
687 rankSender = adjustRank(rankSender);
688
689 PRECICE_ASSERT(rankSender >= 0, rankSender);
691
692 try {
693 asio::read(*_sockets[rankSender], asio::buffer(&itemToReceive, sizeof(bool)));
694 } catch (std::exception &e) {
695 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
696 }
697}
698
699PtrRequest SocketCommunication::aReceive(bool &itemToReceive, Rank rankSender)
700{
701 PRECICE_TRACE(rankSender);
702
703 rankSender = adjustRank(rankSender);
704
705 PRECICE_ASSERT(rankSender >= 0, rankSender);
707
708 PtrRequest request(new SocketRequest);
709
710 try {
711 asio::async_read(*_sockets[rankSender],
712 asio::buffer(&itemToReceive, sizeof(bool)),
713 [request](boost::system::error_code const &, std::size_t) {
714 std::static_pointer_cast<SocketRequest>(request)->complete();
715 });
716 } catch (std::exception &e) {
717 PRECICE_ERROR("Receiving data from another participant (using sockets) failed with a system error: {}. This often means that the other participant exited with an error (look there).", e.what());
718 }
719
720 return request;
721}
722
723#ifndef _WIN32
724namespace {
725struct Interface {
726 unsigned int index;
727 std::string name;
728 std::string address;
729};
730
731std::vector<Interface> detectInterfaces()
732{
733 std::vector<Interface> interfaces;
734
735 // Collect interface indices and names
736 struct if_nameindex *nameInterface = if_nameindex();
737 for (struct if_nameindex *itNameInterface = nameInterface; itNameInterface->if_index != 0; ++itNameInterface) {
738 Interface interface;
739 interface.index = itNameInterface->if_index;
740 interface.name = itNameInterface->if_name;
741 interfaces.emplace_back(std::move(interface));
742 }
743 if_freenameindex(nameInterface);
744
745 // Resolve addresses for interfaces
746 for (auto &interface : interfaces) {
747 struct ifreq request;
748 strncpy(request.ifr_name,
749 interface.name.c_str(),
750 IFNAMSIZ - 1); // Copy interface name
751
752 auto socketfd = socket(AF_INET, SOCK_STREAM, 0);
753 if (socketfd == -1) {
754 continue;
755 }
756 auto err = ioctl(socketfd, SIOCGIFADDR, &request);
757 close(socketfd);
758 if (err) {
759 continue;
760 }
761
762 const char *addr = inet_ntoa((reinterpret_cast<struct sockaddr_in *>(&request.ifr_addr))->sin_addr);
763 if (!addr) {
764 continue;
765 }
766 interface.address = addr;
767 }
768
769 return interfaces;
770}
771} // namespace
772#endif
773
775{
777
778#ifdef _WIN32
779 return "127.0.0.1";
780#else
781
782 PRECICE_DEBUG("Looking for IP address of network \"{}\"", _networkName);
783
784 auto interfaces = detectInterfaces();
785
786 auto pos = std::find_if(interfaces.begin(), interfaces.end(),
787 [&](Interface const &interface) { return interface.name == _networkName; });
788 if (pos == interfaces.end()) {
789 PRECICE_DEBUG("There's NOTHING");
790 std::ostringstream err;
791 err << "Cannot find network interface \"" << _networkName << "\". Available interfaces are: ";
792 for (const auto &interface : interfaces) {
793 err << interface.name << ' ';
794 }
795 err << " Please check \"network\" attributes in your configuration file.";
796 PRECICE_ERROR(err.str());
797 }
798
799 // Unconnected interfaces don't have an IP address.
800 PRECICE_CHECK(not pos->address.empty(), "The interface \"{}\" does not have an IP address. Please select another interface.", _networkName);
801
802 PRECICE_DEBUG("Detected network IP address of interface \"{}\": {}.", _networkName, pos->address);
803 return pos->address;
804#endif
805}
806
807} // namespace precice::com
#define PRECICE_ERROR(...)
Definition LogMacros.hpp:16
#define PRECICE_WARN(...)
Definition LogMacros.hpp:12
#define PRECICE_DEBUG(...)
Definition LogMacros.hpp:61
#define PRECICE_TRACE(...)
Definition LogMacros.hpp:92
#define PRECICE_CHECK(check,...)
Definition LogMacros.hpp:32
#define PRECICE_ASSERT(...)
Definition assertion.hpp:85
void setRankOffset(Rank rankOffset)
Set rank offset.
virtual bool isConnected()
Returns true, if a connection to a remote participant has been setup.
virtual int adjustRank(Rank rank) const
Adjusts the given rank bases on the _rankOffset.
Reads the connection info for the given participant/rank information.
std::string read() const
Reads the info from the connection info file. Will block, if the the file is not present.
Writes the connection info for the given participant/rank information.
void write(std::string_view info) const
Write the string info, e.g. IP:port to the connection info file.
std::unique_ptr< WorkGuard > _workGuard
PtrRequest aReceive(precice::span< double > itemsToReceive, int rankSender) override
Asynchronously receives an array of double values.
void cleanupEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Clean-up environment used to establish the communication.
void send(std::string const &itemToSend, Rank rankReceiver) override
Sends a std::string to process with given rank.
std::string _addressDirectory
Directory where IP address is exchanged by file.
void acceptConnectionAsServer(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int requesterCommunicatorSize) override
Accepts connection from another communicator, which has to call requestConnectionAsClient().
std::shared_ptr< IOContext > _ioContext
void requestConnectionAsClient(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, std::set< int > const &acceptorRanks, int requesterRank) override
Connects to another communicator, which has to call acceptConnectionAsServer().
std::string _networkName
Name of network to communicate over.
size_t getRemoteCommunicatorSize() override
Returns the number of processes in the remote communicator.
std::map< int, std::shared_ptr< Socket > > _sockets
Remote rank -> socket map.
void receive(std::string &itemToReceive, Rank rankSender) override
Receives a std::string from process with given rank.
void closeConnection() override
Disconnects from communication space, i.e. participant.
void prepareEstablishment(std::string const &acceptorName, std::string const &requesterName) override
Prepare environment used to establish the communication.
unsigned short _portNumber
Port used for socket connection.
SocketCommunication(unsigned short portNumber=0, bool reuseAddress=false, std::string networkName=utils::networking::loopbackInterfaceName(), std::string addressDirectory=".")
PtrRequest aSend(precice::span< const int > itemsToSend, Rank rankReceiver) override
Asynchronously sends an array of integer values.
void requestConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int requesterRank, int requesterCommunicatorSize) override
Connects to another communicator, which has to call acceptConnection().
void acceptConnection(std::string const &acceptorName, std::string const &requesterName, std::string const &tag, int acceptorRank, int rankOffset=0) override
Accepts connection from another communicator, which has to call requestConnection().
A C++ 11 implementation of the non-owning C++20 std::span type.
Definition span.hpp:284
constexpr pointer data() const noexcept
Definition span.hpp:500
constexpr size_type size() const noexcept
Definition span.hpp:469
std::string localDirectory(std::string_view acceptorName, std::string_view requesterName, std::string_view addressDirectory)
contains the data communication abstraction layer.
std::shared_ptr< Request > PtrRequest
contains precice-related utilities.
int Rank
Definition Types.hpp:37
auto refToSpan(T &element)
Wraps a single element into a span.
Definition span_tools.hpp:9
STL namespace.