69 for (
auto const& connection : m_connections)
71 endMessageThread(connection.first);
77 m_sendMessageResults[id].store(
true);
79 m_sendMessageThreadsActive[id].store(
true);
80 m_sendMessageThreads[id] = std::make_unique<std::thread>([
this,
id]() {
82 std::unique_lock<std::mutex> sendMessageSignal(m_sendMessageCVMutexs[thisId]);
83 while (m_sendMessageThreadsActive[thisId].load())
85 m_sendMessageCVs[id].wait(sendMessageSignal);
87 std::unique_lock<std::mutex> l(m_sendMessageMutexs[thisId]);
88 while (!m_sendMessageLists[thisId].empty())
90 auto messageData = m_sendMessageLists[thisId].front();
91 m_sendMessageLists[thisId].pop();
93 if (m_connections[thisId] && m_connections[thisId]->isConnected() && !m_connections[thisId]->sendMessage(messageData))
94 m_sendMessageResults[thisId].store(
false);
102void InterprocessConnectionServerImpl::endMessageThread(
int id)
105 std::lock_guard<std::mutex> sendMessageSignal(m_sendMessageCVMutexs[
id]);
106 m_sendMessageThreadsActive[id].store(
false);
108 m_sendMessageCVs[id].notify_all();
109 if (m_sendMessageThreads[
id])
111 m_sendMessageThreads[id]->join();
114 m_sendMessageMutexs.erase(
id);
115 m_sendMessageLists.erase(
id);
116 m_sendMessageResults.erase(
id);
118 m_sendMessageThreadsActive.erase(
id);
119 m_sendMessageThreads.erase(
id);
120 m_sendMessageCVs.erase(
id);
121 m_sendMessageCVMutexs.erase(
id);
126 std::map<int, std::pair<double, bool>> rList;
127 for (
auto const& list : m_sendMessageLists)
129 auto listSize = size_t(0);
131 std::lock_guard<std::mutex> l(m_sendMessageMutexs[list.first]);
132 listSize = m_sendMessageLists[list.first].size();
136 rList[list.first] = std::make_pair(1.0, m_sendMessageListClipped[list.first]);
138 rList[list.first] = std::make_pair(
double(listSize) /
s_listSizeThreshold, m_sendMessageListClipped[list.first]);
145 if (m_connections.count(
id) != 1)
151 if (m_connections.at(
id) && m_connections.at(
id)->isConnected())
155 return m_connections.at(
id) && m_connections.at(
id)->isConnected();
160 for (
auto const& connection : m_connections)
170 return m_connections[id];
175 auto idsToErase = std::vector<int>();
176 for (
auto const& connection : m_connections)
177 if (connection.second && !connection.second->isConnected())
178 idsToErase.push_back(connection.first);
180 for (
auto const&
id : idsToErase)
182 m_connections.erase(
id);
183 endMessageThread(
id);
191 auto ids = std::vector<int>(m_connections.size());
192 for (
auto const& connection : m_connections)
193 ids.push_back(connection.first);
200 for (
auto const& th : m_sendMessageThreads)
202 if (sendIds.empty() || std::find(sendIds.begin(), sendIds.end(), th.first) != sendIds.end())
204 std::lock_guard<std::mutex> l(m_sendMessageMutexs[th.first]);
205 m_sendMessageLists[th.first].push(message);
208 m_sendMessageLists[th.first].pop();
209 m_sendMessageListClipped[th.first] =
true;
212 m_sendMessageListClipped[th.first] =
false;
213 if (!m_sendMessageResults[th.first].load())
216 m_sendMessageResults[th.first].store(
true);
219 m_sendMessageCVs[th.first].notify_all();
225InterprocessConnection* InterprocessConnectionServerImpl::createConnectionObject()
227 m_connectionIdIter++;
228 m_connections[m_connectionIdIter] = std::make_unique<InterprocessConnectionImpl>(m_connectionIdIter);
230 DBG(juce::String(__FUNCTION__) << m_connectionIdIter);
234 m_connections[m_connectionIdIter]->onConnectionLost = [=](
int ) {
241 return m_connections[m_connectionIdIter].get();
std::function< void(int)> onConnectionMade
virtual ~InterprocessConnectionImpl()
void connectionMade() override
std::function< void(int)> onConnectionLost
void connectionLost() override
InterprocessConnectionImpl(int id)
std::function< void(int, const MemoryBlock &)> onMessageReceived
void messageReceived(const MemoryBlock &message) override
std::map< int, std::pair< double, bool > > getListHealth()
bool hasActiveConnection(int id)
const std::unique_ptr< InterprocessConnectionImpl > & getActiveConnection(int id)
virtual ~InterprocessConnectionServerImpl()
const std::vector< int > getActiveConnectionIds()
std::function< void(int)> onConnectionCreated
static constexpr double s_listSizeThreshold
void createMessageThread(int id)
bool hasActiveConnections()
InterprocessConnectionServerImpl()
bool enqueueMessage(const MemoryBlock &message, std::vector< int > sendIds={})
const std::vector< int > cleanupDeadConnections()