Skip to content

Commit 4a10b9b

Browse files
Add scheduler abstraction + default impl (#35)
1 parent 3dff7b9 commit 4a10b9b

32 files changed

+1000
-207
lines changed

include/signalrclient/hub_connection.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ namespace signalr
5555
friend class hub_connection_builder;
5656

5757
explicit hub_connection(const std::string& url, trace_level trace_level = trace_level::info,
58-
std::shared_ptr<log_writer> log_writer = nullptr, std::shared_ptr<http_client> http_client = nullptr,
58+
std::shared_ptr<log_writer> log_writer = nullptr, std::function<std::shared_ptr<http_client>(const signalr_client_config&)> http_client_factory = nullptr,
5959
std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory = nullptr, bool skip_negotiation = false);
6060

6161
std::shared_ptr<hub_connection_impl> m_pImpl;

include/signalrclient/hub_connection_builder.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ namespace signalr
3131

3232
SIGNALRCLIENT_API hub_connection_builder& with_websocket_factory(std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> factory);
3333

34-
SIGNALRCLIENT_API hub_connection_builder& with_http_client(std::shared_ptr<http_client> http_client);
34+
SIGNALRCLIENT_API hub_connection_builder& with_http_client_factory(std::function<std::shared_ptr<http_client>(const signalr_client_config&)> http_client_factory);
3535

3636
SIGNALRCLIENT_API hub_connection_builder& skip_negotiation(bool skip = true);
3737

@@ -43,7 +43,7 @@ namespace signalr
4343
std::shared_ptr<log_writer> m_logger;
4444
trace_level m_log_level;
4545
std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> m_websocket_factory;
46-
std::shared_ptr<http_client> m_http_client;
46+
std::function<std::shared_ptr<http_client>(const signalr_client_config&)> m_http_client_factory;
4747
bool m_skip_negotiation = false;
4848
};
4949
}

include/signalrclient/scheduler.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
#pragma once
6+
7+
#include <exception>
8+
#include <functional>
9+
#include <chrono>
10+
11+
namespace signalr
12+
{
13+
typedef std::function<void()> signalr_base_cb;
14+
15+
struct scheduler
16+
{
17+
virtual void schedule(const signalr_base_cb& cb, std::chrono::milliseconds delay = std::chrono::milliseconds::zero()) = 0;
18+
19+
virtual ~scheduler() {}
20+
};
21+
}

include/signalrclient/signalr_client_config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "_exports.h"
1616
#include <map>
1717
#include <string>
18+
#include "scheduler.h"
19+
#include <memory>
1820

1921
namespace signalr
2022
{
@@ -36,15 +38,20 @@ namespace signalr
3638
SIGNALRCLIENT_API void __cdecl set_websocket_client_config(const web::websockets::client::websocket_client_config& websocket_client_config);
3739
#endif
3840

41+
SIGNALRCLIENT_API __cdecl signalr_client_config();
42+
3943
SIGNALRCLIENT_API const std::map<std::string, std::string>& __cdecl get_http_headers() const noexcept;
4044
SIGNALRCLIENT_API std::map<std::string, std::string>& __cdecl get_http_headers() noexcept;
4145
SIGNALRCLIENT_API void __cdecl set_http_headers(const std::map<std::string, std::string>& http_headers);
46+
SIGNALRCLIENT_API void __cdecl set_scheduler(std::shared_ptr<scheduler> scheduler);
47+
SIGNALRCLIENT_API const std::shared_ptr<scheduler>& __cdecl get_scheduler() const noexcept;
4248

4349
private:
4450
#ifdef USE_CPPRESTSDK
4551
web::http::client::http_client_config m_http_client_config;
4652
web::websockets::client::websocket_client_config m_websocket_client_config;
4753
#endif
4854
std::map<std::string, std::string> m_http_headers;
55+
std::shared_ptr<scheduler> m_scheduler;
4956
};
5057
}

run-tests.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ eval $* &
1111
PID=$!
1212

1313
# Timeout duration
14-
TIME=120
14+
TIME=800
1515

1616
while (($TIME > 0)); do
1717
# polling interval
@@ -34,4 +34,4 @@ done
3434
kill -9 $PID
3535

3636
echo "Process took too long"
37-
exit $EXITCODE
37+
exit 1

src/signalrclient/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ set (SOURCES
2020
transport_factory.cpp
2121
url_builder.cpp
2222
websocket_transport.cpp
23+
signalr_default_scheduler.cpp
2324
../../third_party_code/cpprestsdk/uri.cpp
2425
../../third_party_code/cpprestsdk/uri_builder.cpp
2526
)

src/signalrclient/cancellation_token.h

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,37 @@ namespace signalr
2727
cancellation_token(const cancellation_token&) = delete;
2828
cancellation_token& operator=(const cancellation_token&) = delete;
2929

30+
~cancellation_token()
31+
{
32+
if (m_callback)
33+
{
34+
m_callback();
35+
}
36+
}
37+
3038
void cancel()
3139
{
32-
std::lock_guard<std::mutex> lock(m_lock);
33-
m_signaled = true;
34-
m_condition.notify_all();
40+
std::function<void()> callback;
41+
{
42+
std::lock_guard<std::mutex> lock(m_lock);
43+
m_signaled = true;
44+
m_condition.notify_all();
45+
callback = m_callback;
46+
m_callback = nullptr;
47+
} // unlock
48+
49+
if (callback)
50+
{
51+
callback();
52+
}
3553
}
3654

3755
void reset()
3856
{
3957
std::lock_guard<std::mutex> lock(m_lock);
58+
assert(m_callback == nullptr);
4059
m_signaled = false;
60+
m_callback = nullptr;
4161
}
4262

4363
bool is_canceled() const
@@ -75,9 +95,33 @@ namespace signalr
7595
throw canceled_exception();
7696
}
7797
}
98+
99+
void register_callback(std::function<void()> callback)
100+
{
101+
bool run_callback = false;
102+
{
103+
std::lock_guard<std::mutex> lock(m_lock);
104+
assert(m_callback == nullptr);
105+
if (m_signaled)
106+
{
107+
run_callback = m_signaled;
108+
}
109+
else
110+
{
111+
m_callback = callback;
112+
}
113+
} // unlock
114+
115+
if (run_callback)
116+
{
117+
callback();
118+
}
119+
}
120+
78121
private:
79122
std::mutex m_lock;
80123
std::condition_variable m_condition;
81124
bool m_signaled;
125+
std::function<void()> m_callback;
82126
};
83127
}

src/signalrclient/connection_impl.cpp

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
// See the LICENSE file in the project root for more information.
44

55
#include "stdafx.h"
6-
#include <thread>
76
#include <algorithm>
87
#include "constants.h"
98
#include "connection_impl.h"
@@ -17,6 +16,7 @@
1716
#include <assert.h>
1817
#include "signalrclient/websocket_client.h"
1918
#include "default_websocket_client.h"
19+
#include "signalr_default_scheduler.h"
2020

2121
namespace signalr
2222
{
@@ -26,42 +26,25 @@ namespace signalr
2626
}
2727

2828
std::shared_ptr<connection_impl> connection_impl::create(const std::string& url, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
29-
std::shared_ptr<http_client> http_client, std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory, const bool skip_negotiation)
29+
std::function<std::shared_ptr<http_client>(const signalr_client_config&)> http_client_factory, std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory, const bool skip_negotiation)
3030
{
3131
return std::shared_ptr<connection_impl>(new connection_impl(url, trace_level,
32-
log_writer ? log_writer : std::make_shared<trace_log_writer>(), http_client, websocket_factory, skip_negotiation));
32+
log_writer ? log_writer : std::make_shared<trace_log_writer>(), http_client_factory, websocket_factory, skip_negotiation));
3333
}
3434

3535
connection_impl::connection_impl(const std::string& url, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
36-
std::unique_ptr<http_client> http_client, std::unique_ptr<transport_factory> transport_factory, const bool skip_negotiation)
37-
: m_base_url(url), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level), m_transport(nullptr),
38-
m_transport_factory(std::move(transport_factory)), m_skip_negotiation(skip_negotiation), m_message_received([](const std::string&) noexcept {}), m_disconnected([]() noexcept {})
39-
{
40-
if (http_client != nullptr)
41-
{
42-
m_http_client = std::move(http_client);
43-
}
44-
else
45-
{
46-
#ifdef USE_CPPRESTSDK
47-
m_http_client = std::unique_ptr<class http_client>(new default_http_client());
48-
#endif
49-
}
50-
}
51-
52-
connection_impl::connection_impl(const std::string& url, trace_level trace_level, const std::shared_ptr<log_writer>& log_writer,
53-
std::shared_ptr<http_client> http_client, std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory, const bool skip_negotiation)
36+
std::function<std::shared_ptr<http_client>(const signalr_client_config&)> http_client_factory, std::function<std::shared_ptr<websocket_client>(const signalr_client_config&)> websocket_factory, const bool skip_negotiation)
5437
: m_base_url(url), m_connection_state(connection_state::disconnected), m_logger(log_writer, trace_level), m_transport(nullptr), m_skip_negotiation(skip_negotiation),
55-
m_message_received([](const std::string&) noexcept {}), m_disconnected([]() noexcept {})
38+
m_message_received([](const std::string&) noexcept {}), m_disconnected([]() noexcept {}), m_disconnect_cts(std::make_shared<cancellation_token>())
5639
{
57-
if (http_client != nullptr)
40+
if (http_client_factory != nullptr)
5841
{
59-
m_http_client = std::move(http_client);
42+
m_http_client_factory = std::move(http_client_factory);
6043
}
6144
else
6245
{
6346
#ifdef USE_CPPRESTSDK
64-
m_http_client = std::unique_ptr<class http_client>(new default_http_client());
47+
m_http_client_factory = [](const signalr_client_config&) { return std::unique_ptr<class http_client>(new default_http_client()); };
6548
#endif
6649
}
6750

@@ -72,7 +55,7 @@ namespace signalr
7255
#endif
7356
}
7457

75-
m_transport_factory = std::unique_ptr<transport_factory>(new transport_factory(m_http_client, websocket_factory));
58+
m_transport_factory = std::unique_ptr<transport_factory>(new transport_factory(m_http_client_factory, websocket_factory));
7659
}
7760

7861
connection_impl::~connection_impl()
@@ -138,11 +121,18 @@ namespace signalr
138121
// there should not be any active transport at this point
139122
assert(!m_transport);
140123

141-
m_disconnect_cts = std::make_shared<cancellation_token>();
124+
m_disconnect_cts->reset();
142125
m_start_completed_event.reset();
143126
m_connection_id = "";
144127
}
145128

129+
m_scheduler = m_signalr_client_config.get_scheduler();
130+
if (!m_scheduler)
131+
{
132+
m_scheduler = std::make_shared<signalr_default_scheduler>();
133+
m_signalr_client_config.set_scheduler(m_scheduler);
134+
}
135+
146136
start_negotiate(m_base_url, 0, callback);
147137
}
148138

@@ -157,7 +147,7 @@ namespace signalr
157147
}
158148

159149
std::weak_ptr<connection_impl> weak_connection = shared_from_this();
160-
const auto& token = m_disconnect_cts;
150+
const auto token = m_disconnect_cts;
161151

162152
const auto transport_started = [weak_connection, callback, token](std::shared_ptr<transport> transport, std::exception_ptr exception)
163153
{
@@ -225,7 +215,8 @@ namespace signalr
225215
return start_transport(url, transport_started);
226216
}
227217

228-
negotiate::negotiate(*m_http_client, url, m_signalr_client_config,
218+
auto http_client = m_http_client_factory(m_signalr_client_config);
219+
negotiate::negotiate(http_client, url, m_signalr_client_config,
229220
[callback, weak_connection, redirect_count, token, url, transport_started](negotiation_response&& response, std::exception_ptr exception)
230221
{
231222
auto connection = weak_connection.lock();
@@ -320,7 +311,7 @@ namespace signalr
320311
std::shared_ptr<std::mutex> connect_request_lock = std::make_shared<std::mutex>();
321312

322313
auto weak_connection = std::weak_ptr<connection_impl>(connection);
323-
const auto& disconnect_cts = m_disconnect_cts;
314+
const auto disconnect_cts = m_disconnect_cts;
324315
const auto& logger = m_logger;
325316

326317
auto transport = connection->m_transport_factory->create_transport(
@@ -406,39 +397,51 @@ namespace signalr
406397
}
407398
});
408399

409-
std::thread([disconnect_cts, connect_request_done, connect_request_lock, callback, weak_connection]()
410-
{
411-
disconnect_cts->wait(5000);
400+
disconnect_cts->register_callback([connect_request_done, connect_request_lock, callback]()
401+
{
402+
bool run_callback = false;
403+
{
404+
std::lock_guard<std::mutex> lock(*connect_request_lock);
412405

406+
// no op after connection started successfully
407+
if (*connect_request_done == false)
408+
{
409+
*connect_request_done = true;
410+
run_callback = true;
411+
}
412+
} // unlock
413+
414+
if (run_callback)
415+
{
416+
// The callback checks the disconnect_cts token and will handle it appropriately
417+
callback({}, nullptr);
418+
}
419+
});
420+
421+
timer(m_scheduler, [connect_request_done, connect_request_lock, callback](std::chrono::milliseconds duration) {
413422
bool run_callback = false;
414423
{
415424
std::lock_guard<std::mutex> lock(*connect_request_lock);
425+
416426
// no op after connection started successfully
417427
if (*connect_request_done == false)
418428
{
429+
if (duration < std::chrono::seconds(5))
430+
{
431+
return false;
432+
}
419433
*connect_request_done = true;
420434
run_callback = true;
421435
}
422-
}
436+
} // unlock
423437

424-
// if the disconnect_cts is canceled it means that the connection has been stopped or went out of scope in
425-
// which case we should not throw due to timeout.
426-
if (disconnect_cts->is_canceled())
438+
if (run_callback)
427439
{
428-
if (run_callback)
429-
{
430-
// The callback checks the disconnect_cts token and will handle it appropriately
431-
callback({}, nullptr);
432-
}
433-
}
434-
else
435-
{
436-
if (run_callback)
437-
{
438-
callback({}, std::make_exception_ptr(signalr_exception("transport timed out when trying to connect")));
439-
}
440+
callback({}, std::make_exception_ptr(signalr_exception("transport timed out when trying to connect")));
440441
}
441-
}).detach();
442+
443+
return true;
444+
});
442445

443446
connection->send_connect_request(transport, url, [callback, connect_request_done, connect_request_lock, transport](std::exception_ptr exception)
444447
{
@@ -597,6 +600,7 @@ namespace signalr
597600
const auto current_state = get_connection_state();
598601
if (current_state == connection_state::disconnected)
599602
{
603+
m_disconnect_cts->cancel();
600604
callback(nullptr);
601605
return;
602606
}

0 commit comments

Comments
 (0)