Skip to content

Commit 0d51a36

Browse files
Clean up negotiate, remove timeout and add cancellation from stop (#61)
1 parent dfd1f63 commit 0d51a36

File tree

8 files changed

+104
-170
lines changed

8 files changed

+104
-170
lines changed

src/signalrclient/connection_impl.cpp

Lines changed: 62 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -133,24 +133,33 @@ namespace signalr
133133
m_signalr_client_config.set_scheduler(m_scheduler);
134134
}
135135

136-
start_negotiate(m_base_url, 0, callback);
136+
start_negotiate(m_base_url, callback);
137137
}
138138

139-
void connection_impl::start_negotiate(const std::string& url, int redirect_count, std::function<void(std::exception_ptr)> callback)
139+
void connection_impl::start_negotiate(const std::string& url, std::function<void(std::exception_ptr)> callback)
140140
{
141-
if (redirect_count >= MAX_NEGOTIATE_REDIRECTS)
142-
{
143-
change_state(connection_state::disconnected);
144-
m_start_completed_event.cancel();
145-
callback(std::make_exception_ptr(signalr_exception("Negotiate redirection limit exceeded.")));
146-
return;
147-
}
148-
149141
std::weak_ptr<connection_impl> weak_connection = shared_from_this();
150142
const auto token = m_disconnect_cts;
151143

152-
const auto transport_started = [weak_connection, callback, token](std::shared_ptr<transport> transport, std::exception_ptr exception)
144+
std::shared_ptr<bool> connect_request_done = std::make_shared<bool>();
145+
std::shared_ptr<std::mutex> connect_request_lock = std::make_shared<std::mutex>();
146+
147+
const auto transport_started = [weak_connection, connect_request_done, connect_request_lock, callback, token]
148+
(std::shared_ptr<transport> transport, std::exception_ptr exception)
153149
{
150+
{
151+
std::lock_guard<std::mutex> lock(*connect_request_lock);
152+
// no op after connection started/stopped successfully
153+
if (*connect_request_done == false)
154+
{
155+
*connect_request_done = true;
156+
}
157+
else
158+
{
159+
return;
160+
}
161+
}
162+
154163
auto connection = weak_connection.lock();
155164
if (!connection)
156165
{
@@ -171,7 +180,7 @@ namespace signalr
171180
if (token->is_canceled())
172181
{
173182
connection->m_logger.log(trace_level::info,
174-
"starting the connection has been canceled.");
183+
"starting the connection has been canceled by stop().");
175184
}
176185
else
177186
{
@@ -208,21 +217,47 @@ namespace signalr
208217
callback(nullptr);
209218
};
210219

220+
m_disconnect_cts->register_callback([transport_started]()
221+
{
222+
// The callback checks the disconnect_cts token or if callback already called and will handle it appropriately
223+
// Not passing an error since the callback will create a canceled_exception since it knows the token was canceled.
224+
transport_started(nullptr, nullptr);
225+
});
226+
211227
if (m_skip_negotiation)
212228
{
213229
// TODO: check that the websockets transport is explicitly selected
214230

215231
return start_transport(url, transport_started);
216232
}
217233

234+
start_negotiate_internal(url, 0, transport_started);
235+
}
236+
237+
void connection_impl::start_negotiate_internal(const std::string& url, int redirect_count, std::function<void(std::shared_ptr<transport> transport, std::exception_ptr)> transport_started)
238+
{
239+
if (m_disconnect_cts->is_canceled())
240+
{
241+
return;
242+
}
243+
244+
if (redirect_count >= MAX_NEGOTIATE_REDIRECTS)
245+
{
246+
transport_started(nullptr, std::make_exception_ptr(signalr_exception("Negotiate redirection limit exceeded.")));
247+
return;
248+
}
249+
250+
std::weak_ptr<connection_impl> weak_connection = shared_from_this();
251+
const auto token = m_disconnect_cts;
252+
218253
auto http_client = m_http_client_factory(m_signalr_client_config);
219254
negotiate::negotiate(http_client, url, m_signalr_client_config,
220-
[callback, weak_connection, redirect_count, token, url, transport_started](negotiation_response&& response, std::exception_ptr exception)
255+
[transport_started, weak_connection, redirect_count, token, url](negotiation_response&& response, std::exception_ptr exception)
221256
{
222257
auto connection = weak_connection.lock();
223258
if (!connection)
224259
{
225-
callback(std::make_exception_ptr(signalr_exception("connection no longer exists")));
260+
transport_started(nullptr, std::make_exception_ptr(signalr_exception("connection no longer exists")));
226261
return;
227262
}
228263

@@ -241,17 +276,13 @@ namespace signalr
241276
.append(e.what()));
242277
}
243278
}
244-
connection->change_state(connection_state::disconnected);
245-
connection->m_start_completed_event.cancel();
246-
callback(exception);
279+
transport_started(nullptr, exception);
247280
return;
248281
}
249282

250283
if (!response.error.empty())
251284
{
252-
connection->change_state(connection_state::disconnected);
253-
connection->m_start_completed_event.cancel();
254-
callback(std::make_exception_ptr(signalr_exception(response.error)));
285+
transport_started(nullptr, std::make_exception_ptr(signalr_exception(response.error)));
255286
return;
256287
}
257288

@@ -262,7 +293,7 @@ namespace signalr
262293
auto& headers = connection->m_signalr_client_config.get_http_headers();
263294
headers["Authorization"] = "Bearer " + response.accessToken;
264295
}
265-
connection->start_negotiate(response.url, redirect_count + 1, callback);
296+
connection->start_negotiate_internal(response.url, redirect_count + 1, transport_started);
266297
return;
267298
}
268299

@@ -284,32 +315,26 @@ namespace signalr
284315

285316
if (!foundWebsockets)
286317
{
287-
connection->change_state(connection_state::disconnected);
288-
connection->m_start_completed_event.cancel();
289-
callback(std::make_exception_ptr(signalr_exception("The server does not support WebSockets which is currently the only transport supported by this client.")));
318+
transport_started(nullptr, std::make_exception_ptr(signalr_exception("The server does not support WebSockets which is currently the only transport supported by this client.")));
290319
return;
291320
}
292321

293322
// TODO: use transfer format
294323

295324
if (token->is_canceled())
296325
{
297-
connection->change_state(connection_state::disconnected);
298-
callback(std::make_exception_ptr(canceled_exception()));
326+
transport_started(nullptr, std::make_exception_ptr(canceled_exception()));
299327
return;
300328
}
301329

302330
connection->start_transport(url, transport_started);
303331
});
304332
}
305333

306-
void connection_impl::start_transport(const std::string& url, std::function<void(std::shared_ptr<transport>, std::exception_ptr)> callback)
334+
void connection_impl::start_transport(const std::string& url, std::function<void(std::shared_ptr<transport>, std::exception_ptr)> transport_started)
307335
{
308336
auto connection = shared_from_this();
309337

310-
std::shared_ptr<bool> connect_request_done = std::make_shared<bool>();
311-
std::shared_ptr<std::mutex> connect_request_lock = std::make_shared<std::mutex>();
312-
313338
auto weak_connection = std::weak_ptr<connection_impl>(connection);
314339
const auto disconnect_cts = m_disconnect_cts;
315340
const auto& logger = m_logger;
@@ -332,7 +357,7 @@ namespace signalr
332357
connection->stop_connection(exception);
333358
});
334359

335-
transport->on_receive([disconnect_cts, connect_request_done, connect_request_lock, logger, weak_connection, callback](std::string&& message, std::exception_ptr exception)
360+
transport->on_receive([disconnect_cts, logger, weak_connection, transport_started](std::string&& message, std::exception_ptr exception)
336361
{
337362
if (exception == nullptr)
338363
{
@@ -378,95 +403,20 @@ namespace signalr
378403
return;
379404
}
380405

381-
bool run_callback = false;
382-
{
383-
std::lock_guard<std::mutex> lock(*connect_request_lock);
384-
// no op after connection started successfully
385-
if (*connect_request_done == false)
386-
{
387-
*connect_request_done = true;
388-
run_callback = true;
389-
}
390-
}
391-
392-
if (run_callback)
393-
{
394-
callback({}, exception);
395-
}
396-
}
397-
}
398-
});
399-
400-
disconnect_cts->register_callback([connect_request_done, connect_request_lock, callback]()
401-
{
402-
bool run_callback = false;
403-
{
404-
std::lock_guard<std::mutex> lock(*connect_request_lock);
405-
406-
// no op after connection started successfully
407-
if (*connect_request_done == false)
408-
{
409-
*connect_request_done = true;
410-
run_callback = true;
411-
}
412-
} // unlock
413-
414-
if (run_callback)
415-
{
416-
// The callback checks the disconnect_cts token and will handle it appropriately
417-
callback({}, nullptr);
418-
}
419-
});
420-
421-
timer(m_scheduler, [connect_request_done, connect_request_lock, callback](std::chrono::milliseconds duration)
422-
{
423-
bool run_callback = false;
424-
{
425-
std::lock_guard<std::mutex> lock(*connect_request_lock);
426-
427-
// no op after connection started successfully
428-
if (*connect_request_done == false)
429-
{
430-
if (duration < std::chrono::seconds(5))
431-
{
432-
return false;
433-
}
434-
*connect_request_done = true;
435-
run_callback = true;
406+
transport_started(nullptr, exception);
436407
}
437-
} // unlock
438-
439-
if (run_callback)
440-
{
441-
callback({}, std::make_exception_ptr(signalr_exception("transport timed out when trying to connect")));
442408
}
443-
444-
return true;
445409
});
446410

447-
connection->send_connect_request(transport, url, [callback, connect_request_done, connect_request_lock, transport](std::exception_ptr exception)
411+
connection->send_connect_request(transport, url, [transport_started, transport](std::exception_ptr exception)
448412
{
449-
bool run_callback = false;
413+
if (exception == nullptr)
450414
{
451-
std::lock_guard<std::mutex> lock(*connect_request_lock);
452-
// no op after connection started successfully
453-
if (*connect_request_done == false)
454-
{
455-
*connect_request_done = true;
456-
run_callback = true;
457-
}
415+
transport_started(transport, nullptr);
458416
}
459-
460-
if (run_callback)
417+
else
461418
{
462-
if (exception == nullptr)
463-
{
464-
callback(transport, nullptr);
465-
}
466-
else
467-
{
468-
callback({}, exception);
469-
}
419+
transport_started(nullptr, exception);
470420
}
471421
});
472422
}

src/signalrclient/connection_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ namespace signalr
7676
void start_transport(const std::string& url, std::function<void(std::shared_ptr<transport>, std::exception_ptr)> callback);
7777
void send_connect_request(const std::shared_ptr<transport>& transport,
7878
const std::string& url, std::function<void(std::exception_ptr)> callback);
79-
void start_negotiate(const std::string& url, int redirect_count, std::function<void(std::exception_ptr)> callback);
79+
void start_negotiate(const std::string& url, std::function<void(std::exception_ptr)> callback);
80+
void start_negotiate_internal(const std::string& url, int redirect_count, std::function<void(std::shared_ptr<transport> transport, std::exception_ptr)> callback);
8081

8182
void process_response(std::string&& response);
8283

src/signalrclient/signalr_default_scheduler.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ namespace signalr
7979
m_internals->m_callback = cb;
8080
m_internals->m_busy = true;
8181
} // unlock
82+
}
83+
84+
void thread::start()
85+
{
8286
m_internals->m_callback_cv.notify_one();
8387
}
8488

@@ -157,7 +161,12 @@ namespace signalr
157161
{
158162
if (thread.is_free())
159163
{
160-
thread.add((*it).first);
164+
{
165+
thread.add((*it).first);
166+
(*it).first = nullptr;
167+
// destruct callback in case the destructor can schedule a job which would throw on recursive lock acquisition
168+
}
169+
thread.start();
161170
found = true;
162171
break;
163172
}

src/signalrclient/signalr_default_scheduler.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ namespace signalr
2020
thread& operator=(const thread&) = delete;
2121

2222
void add(signalr_base_cb);
23+
void start();
2324
bool is_free() const;
2425
void shutdown();
2526
~thread();

0 commit comments

Comments
 (0)