NanoOcp
Minimal AES70 / OCP.1 TCP client/server library for d&b Soundscape devices
Loading...
Searching...
No Matches
Ocp1Connection.cpp
Go to the documentation of this file.
1/* Copyright (c) 2023, Christian Ahrens
2 *
3 * This file is part of NanoOcp <https://github.com/ChristianAhrens/NanoOcp>
4 *
5 * This library is free software; you can redistribute it and/or modify it under
6 * the terms of the GNU Lesser General Public License version 3.0 as published
7 * by the Free Software Foundation.
8 *
9 * This library is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 * details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this library; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#include "Ocp1Connection.h"
20#include "Ocp1Message.h"
21
22#ifdef JUCE_GLOBAL_MODULE_SETTINGS_INCLUDED
23 #include <juce_core/juce_core.h>
24 #include <juce_events/juce_events.h>
25#else
26 #include <JuceHeader.h>
27#endif
28
29
30namespace NanoOcp1
31{
32
33
34struct Ocp1Connection::ConnectionThread : public juce::Thread
35{
36 ConnectionThread(Ocp1Connection& c) : juce::Thread("JUCE IPC"), owner(c) {}
37 void run() override { owner.runThread(); }
38
40 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(ConnectionThread)
41};
42
44{
45public:
47 : ref(p) {}
48
49 template <typename Fn>
50 void ifSafe(Fn&& fn)
51 {
52 const juce::ScopedLock lock(mutex);
53
54 if (safe)
55 fn(ref);
56 }
57
58 void setSafe(bool s)
59 {
60 const juce::ScopedLock lock(mutex);
61 safe = s;
62 }
63
64 bool isSafe()
65 {
66 const juce::ScopedLock lock(mutex);
67 return safe;
68 }
69
70private:
71 juce::CriticalSection mutex;
72 Ocp1Connection& ref;
73 bool safe = false;
74};
75
80
81//==============================================================================
82Ocp1Connection::Ocp1Connection(bool callbacksOnMessageThread , const juce::Thread::Priority threadPriority)
83 : useMessageThread(callbacksOnMessageThread),
84 safeAction(std::make_shared<SafeAction>(*this)), m_threadPriority(threadPriority)
85{
86 thread.reset(new ConnectionThread(*this));
87}
88
90{
91 // You *must* call `disconnect` in the destructor of your derived class to ensure
92 // that any pending messages are not delivered. If the messages were delivered after
93 // destroying the derived class, we'd end up calling the pure virtual implementations
94 // of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
95 // not a good idea!
96 jassert(!safeAction->isSafe());
97
98 callbackConnectionState = false;
100 thread.reset();
101}
102
103//==============================================================================
104bool Ocp1Connection::connectToSocket(const juce::String& hostName,
105 int portNumber, int timeOutMillisecs)
106{
107 disconnect(1000);
108
109 auto s = std::make_unique<juce::StreamingSocket>();
110
111 if (s->connect(hostName, portNumber, timeOutMillisecs))
112 {
113 const juce::ScopedWriteLock sl(socketLock);
114 initialiseWithSocket(std::move(s));
115 return true;
116 }
117
118 return false;
119}
120
121void Ocp1Connection::disconnect(int timeoutMs, Notify notify)
122{
123 //should be called before socket->close to ensure that running processes on the thread
124 //are notified that the thread is about to exit.
125 thread->stopThread(timeoutMs);
126
127 {
128 const juce::ScopedReadLock sl(socketLock);
129 if (socket != nullptr) socket->close();
130 }
131
132 deleteSocket();
133
134 if (notify == Notify::yes)
135 connectionLostInt();
136
137 callbackConnectionState = false;
138 safeAction->setSafe(false);
139}
140
141void Ocp1Connection::deleteSocket()
142{
143 const juce::ScopedWriteLock sl(socketLock);
144 socket.reset();
145}
146
148{
149 const juce::ScopedReadLock sl(socketLock);
150
151 return (socket != nullptr && socket->isConnected())
152 && threadIsRunning;
153}
154
156{
157 {
158 const juce::ScopedReadLock sl(socketLock);
159
160 if (socket == nullptr)
161 return {};
162
163 if (socket != nullptr && !socket->isLocal())
164 return socket->getHostName();
165 }
166
167 return juce::IPAddress::local().toString();
168}
169
170//==============================================================================
172{
173 return writeData(const_cast<std::uint8_t*>(message.data()), static_cast<int>(message.size())) == static_cast<int>(message.size());
174}
175
176int Ocp1Connection::writeData(void* data, int dataSize)
177{
178 const juce::ScopedReadLock sl(socketLock);
179
180 if (socket != nullptr)
181 return socket->write(data, dataSize);
182
183 return 0;
184}
185
186//==============================================================================
187void Ocp1Connection::initialise()
188{
189 safeAction->setSafe(true);
190 threadIsRunning = true;
191 connectionMadeInt();
192 thread->startThread(m_threadPriority);
193}
194
195void Ocp1Connection::initialiseWithSocket(std::unique_ptr<juce::StreamingSocket> newSocket)
196{
197 jassert(socket == nullptr);
198 socket = std::move(newSocket);
199 initialise();
200}
201
202//==============================================================================
203struct ConnectionStateMessage : public juce::MessageManager::MessageBase
204{
205 ConnectionStateMessage(std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
206 : safeAction(ipc), connectionMade(connected)
207 {}
208
209 void messageCallback() override
210 {
211 safeAction->ifSafe([this](Ocp1Connection& owner)
212 {
213 if (connectionMade)
214 owner.connectionMade();
215 else
216 owner.connectionLost();
217 });
218 }
219
220 std::shared_ptr<SafeActionImpl> safeAction;
222
223 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(ConnectionStateMessage)
224};
225
226void Ocp1Connection::connectionMadeInt()
227{
228 if (!callbackConnectionState)
229 {
230 callbackConnectionState = true;
231
232 if (useMessageThread)
233 (new ConnectionStateMessage(safeAction, true))->post();
234 else
236 }
237}
238
239void Ocp1Connection::connectionLostInt()
240{
241 if (callbackConnectionState)
242 {
243 callbackConnectionState = false;
244
245 if (useMessageThread)
246 (new ConnectionStateMessage(safeAction, false))->post();
247 else
249 }
250}
251
252struct DataDeliveryMessage : public juce::Message
253{
254 DataDeliveryMessage(std::shared_ptr<SafeActionImpl> ipc, const ByteVector& d)
255 : safeAction(ipc), data(d)
256 {}
257
258 void messageCallback() override
259 {
260 safeAction->ifSafe([this](Ocp1Connection& owner)
261 {
262 owner.messageReceived(data);
263 });
264 }
265
266 std::shared_ptr<SafeActionImpl> safeAction;
268};
269
270void Ocp1Connection::deliverDataInt(const ByteVector& data)
271{
272 jassert(callbackConnectionState);
273
274 if (useMessageThread)
275 (new DataDeliveryMessage(safeAction, data))->post();
276 else
277 messageReceived(data);
278}
279
280//==============================================================================
281int Ocp1Connection::readData(void* data, int num)
282{
283 const juce::ScopedReadLock sl(socketLock);
284
285 if (socket != nullptr)
286 return socket->read(data, num, true);
287
288 jassertfalse;
289 return -1;
290}
291
292bool Ocp1Connection::readNextMessage()
293{
294 // Read enough data to fit an OCA header.
296 auto bytes = readData(messageData.data(), Ocp1Header::Ocp1HeaderSize);
297
298 if (bytes == Ocp1Header::Ocp1HeaderSize)
299 {
300 // Unmarshal the OCA header using a Ocp1Header helper object.
301 Ocp1Header tmpHeader(messageData);
302
303 // Resize the ByteVector to fit the complete OCA message.
304 // NOTE: msgSize does not include the sync byte.
305 messageData.resize(static_cast<size_t>(tmpHeader.GetMessageSize()) + 1);
306
307 auto readPosition = static_cast<int>(Ocp1Header::Ocp1HeaderSize);
308 auto bytesLeft = static_cast<int>(tmpHeader.GetMessageSize() + 1 - Ocp1Header::Ocp1HeaderSize);
309 while (bytesLeft > 0)
310 {
311 if (thread->threadShouldExit())
312 return false;
313
314 auto numThisTime = juce::jmin(bytesLeft, 65536);
315 auto bytesIn = readData(messageData.data() + readPosition, numThisTime);
316
317 if (bytesIn <= 0)
318 break;
319
320 readPosition += bytesIn;
321 bytesLeft -= bytesIn;
322 }
323
324 deliverDataInt(messageData);
325
326 return true;
327 }
328
329 if (bytes < 0)
330 {
331 if (socket != nullptr)
332 deleteSocket();
333
334 connectionLostInt();
335 }
336
337 return false;
338}
339
340void Ocp1Connection::runThread()
341{
342 while (!thread->threadShouldExit())
343 {
344 if (socket != nullptr)
345 {
346 auto ready = socket->waitUntilReady(true, 100);
347
348 if (ready < 0)
349 {
350 deleteSocket();
351 connectionLostInt();
352 break;
353 }
354
355 if (ready == 0)
356 {
357 thread->wait(1);
358 continue;
359 }
360 }
361 else
362 {
363 break;
364 }
365
366 if (thread->threadShouldExit() || !readNextMessage())
367 break;
368 }
369
370 threadIsRunning = false;
371}
372
373}
Low-level TCP socket manager for a single OCP.1 connection.
virtual void connectionLost()=0
Called when the TCP connection is dropped or closed. Override to react.
Notify
Controls whether connectionLost() is called when disconnect() is invoked. Use Notify::no when shuttin...
bool connectToSocket(const juce::String &hostName, int portNumber, int timeOutMillisecs)
Attempts a TCP connection to the given host and port. Spawns the read thread on success.
void disconnect(int timeoutMs=0, Notify notify=Notify::yes)
Closes the TCP socket and stops the read thread.
juce::String getConnectedHostName() const
Returns the hostname of the currently connected remote peer, or an empty string.
Ocp1Connection(bool callbacksOnMessageThread=true, const juce::Thread::Priority threadPriority=juce::Thread::Priority::normal)
Constructs the connection object.
virtual void messageReceived(const ByteVector &message)=0
Called with each complete OCP.1 frame received from the remote device. Pass message to Ocp1Message::U...
bool isConnected() const
Returns true if the TCP socket is currently open.
virtual void connectionMade()=0
Called when the TCP connection is successfully established. Override to react.
bool sendMessage(const ByteVector &message)
Sends a complete OCP.1 frame over the TCP socket. Acquires the write lock, then writes all bytes in o...
static constexpr std::uint32_t Ocp1HeaderSize
SafeActionImpl(Ocp1Connection &p)
Minimal AES70 / OCP.1 TCP client/server library built on JUCE.
Definition NanoOcp1.cpp:23
std::vector< std::uint8_t > ByteVector
Binary buffer type used throughout NanoOcp for all serialized OCP.1 data.
ConnectionStateMessage(std::shared_ptr< SafeActionImpl > ipc, bool connected) noexcept
std::shared_ptr< SafeActionImpl > safeAction
DataDeliveryMessage(std::shared_ptr< SafeActionImpl > ipc, const ByteVector &d)
std::shared_ptr< SafeActionImpl > safeAction