Mema
Memory Matrix — multi-channel audio matrix monitor and router
Loading...
Searching...
No Matches
InterprocessConnection.cpp
Go to the documentation of this file.
1/* Copyright (c) 2024, Christian Ahrens
2 *
3 * This file is part of Mema <https://github.com/ChristianAhrens/Mema>
4 *
5 * This tool is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU Lesser General Public License version 3.0 as published
7 * by the Free Software Foundation.
8 *
9 * This tool is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 * details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this tool; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
20
21
22namespace Mema
23{
24
25
26//==============================================================================
27InterprocessConnectionImpl::InterprocessConnectionImpl(int id) : juce::InterprocessConnection()
28{
29 m_id = id;
30}
31
36
42
48
49void InterprocessConnectionImpl::messageReceived(const MemoryBlock& message)
50{
52 onMessageReceived(m_id, message);
53}
54
56{
57 return m_id;
58}
59
60
61//==============================================================================
63{
64
65}
66
68{
69 for (auto const& connection : m_connections)
70 {
71 endMessageThread(connection.first);
72 }
73}
74
76{
77 m_sendMessageResults[id].store(true);
78
79 m_sendMessageThreadsActive[id].store(true);
80 m_sendMessageThreads[id] = std::make_unique<std::thread>([this, id]() {
81 auto thisId = id;
82 std::unique_lock<std::mutex> sendMessageSignal(m_sendMessageCVMutexs[thisId]);
83 while (m_sendMessageThreadsActive[thisId].load())
84 {
85 m_sendMessageCVs[id].wait(sendMessageSignal);
86
87 std::unique_lock<std::mutex> l(m_sendMessageMutexs[thisId]);
88 while (!m_sendMessageLists[thisId].empty())
89 {
90 auto messageData = m_sendMessageLists[thisId].front();
91 m_sendMessageLists[thisId].pop();
92 l.unlock();
93 if (m_connections[thisId] && m_connections[thisId]->isConnected() && !m_connections[thisId]->sendMessage(messageData))
94 m_sendMessageResults[thisId].store(false);
95 l.lock();
96 }
97 }
98
99 });
100}
101
102void InterprocessConnectionServerImpl::endMessageThread(int id)
103{
104 {
105 std::lock_guard<std::mutex> sendMessageSignal(m_sendMessageCVMutexs[id]);
106 m_sendMessageThreadsActive[id].store(false);
107 }
108 m_sendMessageCVs[id].notify_all();
109 if (m_sendMessageThreads[id])
110 {
111 m_sendMessageThreads[id]->join();
112 }
113
114 m_sendMessageMutexs.erase(id);
115 m_sendMessageLists.erase(id);
116 m_sendMessageResults.erase(id);
117
118 m_sendMessageThreadsActive.erase(id);
119 m_sendMessageThreads.erase(id);
120 m_sendMessageCVs.erase(id);
121 m_sendMessageCVMutexs.erase(id);
122}
123
124std::map<int, std::pair<double, bool>> InterprocessConnectionServerImpl::getListHealth()
125{
126 std::map<int, std::pair<double, bool>> rList;
127 for (auto const& list : m_sendMessageLists)
128 {
129 auto listSize = size_t(0);
130 {
131 std::lock_guard<std::mutex> l(m_sendMessageMutexs[list.first]);
132 listSize = m_sendMessageLists[list.first].size();
133 }
134
135 if (listSize >= s_listSizeThreshold)
136 rList[list.first] = std::make_pair(1.0, m_sendMessageListClipped[list.first]);
137 else
138 rList[list.first] = std::make_pair(double(listSize) / s_listSizeThreshold, m_sendMessageListClipped[list.first]);
139 }
140 return rList;
141}
142
144{
145 if (m_connections.count(id) != 1)
146 {
147 return false;
148 }
149 else
150 {
151 if (m_connections.at(id) && m_connections.at(id)->isConnected())
152 return true;
153 }
154
155 return m_connections.at(id) && m_connections.at(id)->isConnected();
156}
157
159{
160 for (auto const& connection : m_connections)
161 {
162 if (hasActiveConnection(connection.first))
163 return true;
164 }
165 return false;
166}
167
168const std::unique_ptr<InterprocessConnectionImpl>& InterprocessConnectionServerImpl::getActiveConnection(int id)
169{
170 return m_connections[id];
171}
172
174{
175 auto idsToErase = std::vector<int>();
176 for (auto const& connection : m_connections)
177 if (connection.second && !connection.second->isConnected())
178 idsToErase.push_back(connection.first);
179
180 for (auto const& id : idsToErase)
181 {
182 m_connections.erase(id);
183 endMessageThread(id);
184 }
185
186 return idsToErase;
187}
188
190{
191 auto ids = std::vector<int>(m_connections.size());
192 for (auto const& connection : m_connections)
193 ids.push_back(connection.first);
194 return ids;
195}
196
197bool InterprocessConnectionServerImpl::enqueueMessage(const MemoryBlock& message, std::vector<int> sendIds)
198{
199 auto rVal = true;
200 for (auto const& th : m_sendMessageThreads)
201 {
202 if (sendIds.empty() || std::find(sendIds.begin(), sendIds.end(), th.first) != sendIds.end())
203 {
204 std::lock_guard<std::mutex> l(m_sendMessageMutexs[th.first]);
205 m_sendMessageLists[th.first].push(message);
206 if (m_sendMessageLists[th.first].size() > s_listSizeThreshold)
207 {
208 m_sendMessageLists[th.first].pop();
209 m_sendMessageListClipped[th.first] = true;
210 }
211 else
212 m_sendMessageListClipped[th.first] = false;
213 if (!m_sendMessageResults[th.first].load())
214 {
215 rVal = false;
216 m_sendMessageResults[th.first].store(true);
217 }
218 }
219 m_sendMessageCVs[th.first].notify_all();
220 }
221
222 return rVal;
223}
224
225InterprocessConnection* InterprocessConnectionServerImpl::createConnectionObject()
226{
227 m_connectionIdIter++;
228 m_connections[m_connectionIdIter] = std::make_unique<InterprocessConnectionImpl>(m_connectionIdIter);
229
230 DBG(juce::String(__FUNCTION__) << m_connectionIdIter);
231
232 createMessageThread(m_connectionIdIter);
233
234 m_connections[m_connectionIdIter]->onConnectionLost = [=](int /*connectionId*/) {
236 };
237
239 onConnectionCreated(m_connectionIdIter);
240
241 return m_connections[m_connectionIdIter].get();
242}
243
244
245} // namespace Mema
std::function< void(int)> onConnectionMade
std::function< void(int)> onConnectionLost
std::function< void(int, const MemoryBlock &)> onMessageReceived
void messageReceived(const MemoryBlock &message) override
std::map< int, std::pair< double, bool > > getListHealth()
const std::unique_ptr< InterprocessConnectionImpl > & getActiveConnection(int id)
bool enqueueMessage(const MemoryBlock &message, std::vector< int > sendIds={})
Definition Mema.cpp:27