Add SignalTrampoline to ReadEvent and WriteEvent

These two signals were sigslot::multi_thraded_local, so a new copy
of the Trampoline template was created for them.

Sideswipe: Namespace an unit test file.

Bug: webrtc:42222066
Change-Id: I21c88eaba9acc691fa01f68048cd270b01535a01
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/420581
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#46054}
This commit is contained in:
Harald Alvestrand 2025-10-27 22:08:03 +00:00 committed by WebRTC LUCI CQ
parent fb21c68959
commit bd1d02f84b
21 changed files with 170 additions and 131 deletions

View File

@ -63,9 +63,10 @@ void PeerConnectionClient::InitSocketSignals() {
[this](webrtc::Socket* socket) { OnConnect(socket); });
hanging_get_->SubscribeConnectEvent(
[this](webrtc::Socket* socket) { OnHangingGetConnect(socket); });
control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
hanging_get_->SignalReadEvent.connect(
this, &PeerConnectionClient::OnHangingGetRead);
control_socket_->SubscribeReadEvent(
this, [this](webrtc::Socket* socket) { OnRead(socket); });
hanging_get_->SubscribeReadEvent(
this, [this](webrtc::Socket* socket) { OnHangingGetRead(socket); });
}
int PeerConnectionClient::id() const {

View File

@ -127,7 +127,7 @@ class NATProxyServerSocket : public AsyncProxyServerSocket {
BufferInput(false);
NotifyConnectRequest(this, dest_addr);
if (remainder) {
SignalReadEvent(this);
NotifyReadEvent(this);
}
}
};

View File

@ -263,12 +263,12 @@ class NATSocket : public Socket, public sigslot::has_slots<> {
if (type_ == SOCK_STREAM && !server_addr_.IsNil() && !connected_) {
HandleConnectReply();
} else {
SignalReadEvent(this);
NotifyReadEvent(this);
}
}
void OnWriteEvent(Socket* socket) {
RTC_DCHECK(socket == socket_);
SignalWriteEvent(this);
NotifyWriteEvent(this);
}
void OnCloseEvent(Socket* socket, int error) {
RTC_DCHECK(socket == socket_);
@ -285,8 +285,10 @@ class NATSocket : public Socket, public sigslot::has_slots<> {
if (result >= 0) {
socket_->SubscribeConnectEvent(
[this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &NATSocket::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &NATSocket::OnWriteEvent);
socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnReadEvent(socket); });
socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnWriteEvent(socket); });
socket_->SubscribeCloseEvent(this, [this](Socket* socket, int error) {
OnCloseEvent(socket, error);
});

View File

@ -367,7 +367,8 @@ class NatTcpTest : public ::testing::Test, public sigslot::has_slots<> {
void OnCloseEvent(Socket* socket, int error) {}
void ConnectEvents() {
server_->SignalReadEvent.connect(this, &NatTcpTest::OnAcceptEvent);
server_->SubscribeReadEvent(
this, [this](Socket* socket) { OnAcceptEvent(socket); });
client_->SubscribeConnectEvent(
[this](Socket* socket) { OnConnectEvent(socket); });
}

View File

@ -122,8 +122,8 @@ void TurnServer::AddInternalServerSocket(
ServerSocketInfo{.proto = protocol,
.ssl_adapter_factory = std::move(ssl_adapter_factory)});
RTC_DCHECK(inserted);
iter->first->SignalReadEvent.connect(this,
&TurnServer::OnNewInternalConnection);
iter->first->SubscribeReadEvent(
this, [this](Socket* socket) { OnNewInternalConnection(socket); });
}
void TurnServer::SetExternalSocketFactory(PacketSocketFactory* factory,

View File

@ -25,8 +25,10 @@ AsyncSocketAdapter::AsyncSocketAdapter(Socket* socket)
RTC_DCHECK(socket_);
socket_->SubscribeConnectEvent(
[this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &AsyncSocketAdapter::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncSocketAdapter::OnWriteEvent);
socket_->SubscribeReadEvent(this,
[this](Socket* socket) { OnReadEvent(socket); });
socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnWriteEvent(socket); });
socket_->SubscribeCloseEvent(
[this](Socket* socket, int err) { OnCloseEvent(socket, err); });
}
@ -105,11 +107,11 @@ void AsyncSocketAdapter::OnConnectEvent(Socket* socket) {
}
void AsyncSocketAdapter::OnReadEvent(Socket* socket) {
SignalReadEvent(this);
NotifyReadEvent(this);
}
void AsyncSocketAdapter::OnWriteEvent(Socket* socket) {
SignalWriteEvent(this);
NotifyWriteEvent(this);
}
void AsyncSocketAdapter::OnCloseEvent(Socket* socket, int err) {

View File

@ -60,8 +60,10 @@ AsyncTCPSocketBase::AsyncTCPSocketBase(
socket_->SubscribeConnectEvent(
[this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
socket_->SubscribeReadEvent(this,
[this](Socket* socket) { OnReadEvent(socket); });
socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnWriteEvent(socket); });
socket_->SubscribeCloseEvent(
[this](Socket* socket, int error) { OnCloseEvent(socket, error); });
}
@ -294,7 +296,8 @@ AsyncTcpListenSocket::AsyncTcpListenSocket(const Environment& env,
std::unique_ptr<Socket> socket)
: env_(env), socket_(std::move(socket)) {
RTC_DCHECK(socket_.get() != nullptr);
socket_->SignalReadEvent.connect(this, &AsyncTcpListenSocket::OnReadEvent);
socket_->SubscribeReadEvent(this,
[this](Socket* socket) { OnReadEvent(socket); });
if (socket_->Listen(kListenBacklog) < 0) {
RTC_LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError();
}
@ -331,7 +334,7 @@ void AsyncTcpListenSocket::OnReadEvent(Socket* socket) {
HandleIncomingConnection(absl::WrapUnique(new_socket));
// Prime a read event in case data is waiting.
new_socket->SignalReadEvent(new_socket);
new_socket->NotifyReadEvent(new_socket);
}
void AsyncTcpListenSocket::HandleIncomingConnection(

View File

@ -44,7 +44,7 @@ TEST(AsyncTCPSocketTest, OnWriteEvent) {
&AsyncTCPSocketObserver::OnReadyToSend);
EXPECT_FALSE(observer.ready_to_send);
socket_ref.SignalWriteEvent(&socket_ref);
socket_ref.NotifyWriteEvent(&socket_ref);
EXPECT_TRUE(observer.ready_to_send);
}

View File

@ -53,8 +53,10 @@ AsyncUDPSocket::AsyncUDPSocket(const Environment& env,
sequence_checker_(SequenceChecker::kDetached),
socket_(std::move(socket)) {
// The socket should start out readable but not writable.
socket_->SignalReadEvent.connect(this, &AsyncUDPSocket::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncUDPSocket::OnWriteEvent);
socket_->SubscribeReadEvent(this,
[this](Socket* socket) { OnReadEvent(socket); });
socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnWriteEvent(socket); });
}
SocketAddress AsyncUDPSocket::GetLocalAddress() const {

View File

@ -86,7 +86,7 @@ TEST(AsyncUDPSocketTest, ArrivalTimeStampCanBeBeforeCurrentTime) {
.WillRepeatedly([&](AsyncPacketSocket*, const ReceivedIpPacket& packet) {
EXPECT_EQ(packet.arrival_time(), webrtc_clock.CurrentTime());
});
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
// Let 10ms pass until next read event.
webrtc_clock.AdvanceTime(TimeDelta::Millis(10));
@ -103,7 +103,7 @@ TEST(AsyncUDPSocketTest, ArrivalTimeStampCanBeBeforeCurrentTime) {
EXPECT_EQ(packet.arrival_time(),
webrtc_clock.CurrentTime() - TimeDelta::Millis(5));
});
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
}
TEST(AsyncUDPSocketTest, InitiallyBufferedPacketsGetSameArrivalTime) {
@ -135,15 +135,15 @@ TEST(AsyncUDPSocketTest, InitiallyBufferedPacketsGetSameArrivalTime) {
EXPECT_CALL(received_packet_callback, Call)
.Times(3)
.WillRepeatedly([&](AsyncPacketSocket*, const ReceivedIpPacket& packet) {
// Despite the packets being received at different times, They all have
// Despite the packets being received at different times, they all have
// the same timestamp.
EXPECT_EQ(packet.arrival_time(), webrtc_clock.CurrentTime());
});
// But assume, CPU is blocked and can not read the packet at the pace they
// arrive. Instead they are read one after each other a bit later.
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
}
TEST(AsyncUDPSocketTest, ArrivalTimeStampCanNotBeAfterCurrentTime) {
@ -168,7 +168,7 @@ TEST(AsyncUDPSocketTest, ArrivalTimeStampCanNotBeAfterCurrentTime) {
.WillRepeatedly([&](AsyncPacketSocket*, const ReceivedIpPacket& packet) {
EXPECT_EQ(packet.arrival_time(), webrtc_clock.CurrentTime());
});
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
// Let 10ms pass until next read event.
webrtc_clock.AdvanceTime(TimeDelta::Millis(10));
@ -186,7 +186,7 @@ TEST(AsyncUDPSocketTest, ArrivalTimeStampCanNotBeAfterCurrentTime) {
// time.
EXPECT_EQ(packet.arrival_time(), webrtc_clock.CurrentTime());
});
socket_ptr->SignalReadEvent(socket_ptr);
socket_ptr->NotifyReadEvent(socket_ptr);
}
} // namespace webrtc

View File

@ -1035,15 +1035,15 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
}
if ((ff & DE_ACCEPT) != 0) {
DisableEvents(DE_ACCEPT);
SignalReadEvent(this);
NotifyReadEvent(this);
}
if ((ff & DE_READ) != 0) {
DisableEvents(DE_READ);
SignalReadEvent(this);
NotifyReadEvent(this);
}
if ((ff & DE_WRITE) != 0) {
DisableEvents(DE_WRITE);
SignalWriteEvent(this);
NotifyWriteEvent(this);
}
if ((ff & DE_CLOSE) != 0) {
// The socket is now dead to us, so stop checking it.

View File

@ -38,7 +38,8 @@ ProxyServer::ProxyServer(SocketFactory* int_factory,
RTC_DCHECK(int_addr.family() == AF_INET || int_addr.family() == AF_INET6);
server_socket_->Bind(int_addr);
server_socket_->Listen(5);
server_socket_->SignalReadEvent.connect(this, &ProxyServer::OnAcceptEvent);
server_socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnAcceptEvent(socket); });
}
ProxyServer::~ProxyServer() = default;
@ -77,14 +78,18 @@ ProxyBinding::ProxyBinding(AsyncProxyServerSocket* int_socket,
[this](AsyncProxyServerSocket* socket, const SocketAddress& addr) {
OnConnectRequest(socket, addr);
});
int_socket_->SignalReadEvent.connect(this, &ProxyBinding::OnInternalRead);
int_socket_->SignalWriteEvent.connect(this, &ProxyBinding::OnInternalWrite);
int_socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnInternalRead(socket); });
int_socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnInternalWrite(socket); });
int_socket_->SubscribeCloseEvent(
[this](Socket* socket, int error) { OnInternalClose(socket, error); });
ext_socket_->SubscribeConnectEvent(
[this](Socket* socket) { OnExternalConnect(socket); });
ext_socket_->SignalReadEvent.connect(this, &ProxyBinding::OnExternalRead);
ext_socket_->SignalWriteEvent.connect(this, &ProxyBinding::OnExternalWrite);
ext_socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnExternalRead(socket); });
ext_socket_->SubscribeWriteEvent(
this, [this](Socket* socket) { OnExternalWrite(socket); });
ext_socket_->SubscribeCloseEvent(
[this](Socket* socket, int error) { OnExternalClose(socket, error); });
}

View File

@ -162,8 +162,24 @@ class RTC_EXPORT Socket {
// but at the same time the SocketDispatcher may be signaling the read event.
// ready to read
sigslot::signal1<Socket*, sigslot::multi_threaded_local> SignalReadEvent;
void SubscribeReadEvent(void* tag,
absl::AnyInvocable<void(Socket*)> callback) {
read_event_trampoline_.Subscribe(tag, std::move(callback));
}
void UnsubscribeReadEvent(void* tag) {
read_event_trampoline_.Unsubscribe(tag);
}
void NotifyReadEvent(Socket* socket) { SignalReadEvent(socket); }
// ready to write
sigslot::signal1<Socket*, sigslot::multi_threaded_local> SignalWriteEvent;
void SubscribeWriteEvent(void* tag,
absl::AnyInvocable<void(Socket*)> callback) {
write_event_trampoline_.Subscribe(tag, std::move(callback));
}
void UnsubscribeWriteEvent(void* tag) {
write_event_trampoline_.Unsubscribe(tag);
}
void NotifyWriteEvent(Socket* socket) { SignalWriteEvent(socket); }
sigslot::signal1<Socket*> SignalConnectEvent; // connected
void SubscribeConnectEvent(void* tag,
absl::AnyInvocable<void(Socket*)> callback) {
@ -193,9 +209,17 @@ class RTC_EXPORT Socket {
}
protected:
Socket() : connect_event_trampoline_(this), close_event_trampoline_(this) {}
Socket()
: read_event_trampoline_(this),
write_event_trampoline_(this),
connect_event_trampoline_(this),
close_event_trampoline_(this) {}
private:
MultiThreadSignalTrampoline<Socket, &Socket::SignalReadEvent>
read_event_trampoline_;
MultiThreadSignalTrampoline<Socket, &Socket::SignalWriteEvent>
write_event_trampoline_;
SignalTrampoline<Socket, &Socket::SignalConnectEvent>
connect_event_trampoline_;
SignalTrampoline<Socket, &Socket::SignalCloseEvent> close_event_trampoline_;

View File

@ -210,7 +210,7 @@ void AsyncSSLSocket::ProcessInput(char* data, size_t* len) {
// FIX: if SignalConnect causes the socket to be destroyed, we are in trouble
if (remainder)
SignalReadEvent(this);
NotifyReadEvent(this);
}
} // namespace webrtc

View File

@ -757,7 +757,8 @@ void SocketTest::DeleteInReadCallbackInternal(const IPAddress& loopback) {
// Configure the helper class to delete socket 2 when socket 1 has a read
// event.
SocketDeleter deleter(std::move(socket2));
socket1->SignalReadEvent.connect(&deleter, &SocketDeleter::Delete);
socket1->SubscribeReadEvent(
&deleter, [&deleter](Socket* socket) { deleter.Delete(socket); });
EXPECT_THAT(WaitUntil([&] { return deleter.deleted(); }, ::testing::IsTrue()),
IsRtcOk());
}

View File

@ -33,27 +33,29 @@
#include "test/gtest.h"
#include "test/wait_until.h"
namespace webrtc {
namespace {
using ::testing::_;
using ::testing::Return;
static const webrtc::TimeDelta kTimeout = webrtc::TimeDelta::Millis(5000);
static const TimeDelta kTimeout = TimeDelta::Millis(5000);
static webrtc::Socket* CreateSocket() {
webrtc::SocketAddress address(webrtc::IPAddress(INADDR_ANY), 0);
static Socket* CreateSocket() {
SocketAddress address(IPAddress(INADDR_ANY), 0);
webrtc::Socket* socket =
webrtc::Thread::Current()->socketserver()->CreateSocket(address.family(),
SOCK_STREAM);
Socket* socket = Thread::Current()->socketserver()->CreateSocket(
address.family(), SOCK_STREAM);
socket->Bind(address);
return socket;
}
// Simple mock for the certificate verifier.
class MockCertVerifier : public webrtc::SSLCertificateVerifier {
class MockCertVerifier : public SSLCertificateVerifier {
public:
~MockCertVerifier() override = default;
MOCK_METHOD(bool, Verify, (const webrtc::SSLCertificate&), (override));
MOCK_METHOD(bool, Verify, (const SSLCertificate&), (override));
};
// TODO(benwright) - Move to using INSTANTIATE_TEST_SUITE_P instead of using
@ -63,20 +65,19 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
explicit SSLAdapterTestDummy() : socket_(CreateSocket()) {}
~SSLAdapterTestDummy() override = default;
void CreateSSLAdapter(webrtc::Socket* socket, webrtc::SSLRole role) {
ssl_adapter_.reset(webrtc::SSLAdapter::Create(socket));
void CreateSSLAdapter(Socket* socket, SSLRole role) {
ssl_adapter_.reset(SSLAdapter::Create(socket));
// Ignore any certificate errors for the purpose of testing.
// Note: We do this only because we don't have a real certificate.
// NEVER USE THIS IN PRODUCTION CODE!
ssl_adapter_->SetIgnoreBadCert(true);
ssl_adapter_->SignalReadEvent.connect(
this, &SSLAdapterTestDummy::OnSSLAdapterReadEvent);
ssl_adapter_->SubscribeCloseEvent(
[this](webrtc::Socket* socket, int error) {
OnSSLAdapterCloseEvent(socket, error);
});
ssl_adapter_->SubscribeReadEvent(
this, [this](Socket* socket) { OnSSLAdapterReadEvent(socket); });
ssl_adapter_->SubscribeCloseEvent([this](Socket* socket, int error) {
OnSSLAdapterCloseEvent(socket, error);
});
ssl_adapter_->SetRole(role);
}
@ -84,7 +85,7 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
ssl_adapter_->SetIgnoreBadCert(ignore_bad_cert);
}
void SetCertVerifier(webrtc::SSLCertificateVerifier* ssl_cert_verifier) {
void SetCertVerifier(SSLCertificateVerifier* ssl_cert_verifier) {
ssl_adapter_->SetCertVerifier(ssl_cert_verifier);
}
@ -96,13 +97,9 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
ssl_adapter_->SetEllipticCurves(curves);
}
webrtc::SocketAddress GetAddress() const {
return ssl_adapter_->GetLocalAddress();
}
SocketAddress GetAddress() const { return ssl_adapter_->GetLocalAddress(); }
webrtc::Socket::ConnState GetState() const {
return ssl_adapter_->GetState();
}
Socket::ConnState GetState() const { return ssl_adapter_->GetState(); }
const std::string& GetReceivedData() const { return data_; }
@ -114,7 +111,7 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
return ssl_adapter_->Send(message.data(), message.length());
}
void OnSSLAdapterReadEvent(webrtc::Socket* socket) {
void OnSSLAdapterReadEvent(Socket* socket) {
char buffer[4096] = "";
// Read data received from the server and store it in our internal buffer.
@ -128,18 +125,18 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
}
}
void OnSSLAdapterCloseEvent(webrtc::Socket* socket, int error) {
void OnSSLAdapterCloseEvent(Socket* socket, int error) {
// OpenSSLAdapter signals handshake failure with a close event, but without
// closing the socket! Let's close the socket here. This way GetState() can
// return CS_CLOSED after failure.
if (socket->GetState() != webrtc::Socket::CS_CLOSED) {
if (socket->GetState() != Socket::CS_CLOSED) {
socket->Close();
}
}
protected:
std::unique_ptr<webrtc::SSLAdapter> ssl_adapter_;
std::unique_ptr<webrtc::Socket> socket_;
std::unique_ptr<SSLAdapter> ssl_adapter_;
std::unique_ptr<Socket> socket_;
private:
std::string data_;
@ -148,11 +145,10 @@ class SSLAdapterTestDummy : public sigslot::has_slots<> {
class SSLAdapterTestDummyClient : public SSLAdapterTestDummy {
public:
explicit SSLAdapterTestDummyClient() : SSLAdapterTestDummy() {
CreateSSLAdapter(socket_.release(), webrtc::SSL_CLIENT);
CreateSSLAdapter(socket_.release(), SSL_CLIENT);
}
int Connect(absl::string_view hostname,
const webrtc::SocketAddress& address) {
int Connect(absl::string_view hostname, const SocketAddress& address) {
RTC_LOG(LS_INFO) << "Initiating connection with " << address.ToString();
int rv = ssl_adapter_->Connect(address);
@ -170,20 +166,18 @@ class SSLAdapterTestDummyClient : public SSLAdapterTestDummy {
class SSLAdapterTestDummyServer : public SSLAdapterTestDummy {
public:
explicit SSLAdapterTestDummyServer(const webrtc::KeyParams& key_params)
explicit SSLAdapterTestDummyServer(const KeyParams& key_params)
: SSLAdapterTestDummy(),
ssl_identity_(webrtc::SSLIdentity::Create(GetHostname(), key_params)) {
ssl_identity_(SSLIdentity::Create(GetHostname(), key_params)) {
socket_->Listen(1);
socket_->SignalReadEvent.connect(this,
&SSLAdapterTestDummyServer::OnReadEvent);
socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnReadEvent(socket); });
RTC_LOG(LS_INFO) << "TCP server listening on "
<< socket_->GetLocalAddress().ToString();
}
webrtc::SocketAddress GetAddress() const {
return socket_->GetLocalAddress();
}
SocketAddress GetAddress() const { return socket_->GetLocalAddress(); }
std::string GetHostname() const {
// Since we don't have a real certificate anyway, the value here doesn't
@ -192,8 +186,8 @@ class SSLAdapterTestDummyServer : public SSLAdapterTestDummy {
}
protected:
void OnReadEvent(webrtc::Socket* socket) {
CreateSSLAdapter(socket_->Accept(nullptr), webrtc::SSL_SERVER);
void OnReadEvent(Socket* socket) {
CreateSSLAdapter(socket_->Accept(nullptr), SSL_SERVER);
ssl_adapter_->SetIdentity(ssl_identity_->Clone());
if (ssl_adapter_->StartSSL(GetHostname()) != 0) {
RTC_LOG(LS_ERROR) << "Starting SSL from server failed.";
@ -201,27 +195,25 @@ class SSLAdapterTestDummyServer : public SSLAdapterTestDummy {
}
private:
std::unique_ptr<webrtc::SSLIdentity> ssl_identity_;
std::unique_ptr<SSLIdentity> ssl_identity_;
};
class SSLAdapterTestBase : public ::testing::Test, public sigslot::has_slots<> {
public:
explicit SSLAdapterTestBase(const webrtc::KeyParams& key_params)
: vss_(new webrtc::VirtualSocketServer()),
explicit SSLAdapterTestBase(const KeyParams& key_params)
: vss_(new VirtualSocketServer()),
thread_(vss_.get()),
server_(new SSLAdapterTestDummyServer(key_params)),
client_(new SSLAdapterTestDummyClient()),
handshake_wait_(webrtc::TimeDelta::Millis(kTimeout.ms())) {}
handshake_wait_(TimeDelta::Millis(kTimeout.ms())) {}
void SetHandshakeWait(int wait) {
handshake_wait_ = webrtc::TimeDelta::Millis(wait);
}
void SetHandshakeWait(int wait) { handshake_wait_ = TimeDelta::Millis(wait); }
void SetIgnoreBadCert(bool ignore_bad_cert) {
client_->SetIgnoreBadCert(ignore_bad_cert);
}
void SetCertVerifier(webrtc::SSLCertificateVerifier* ssl_cert_verifier) {
void SetCertVerifier(SSLCertificateVerifier* ssl_cert_verifier) {
client_->SetCertVerifier(ssl_cert_verifier);
}
@ -236,8 +228,8 @@ class SSLAdapterTestBase : public ::testing::Test, public sigslot::has_slots<> {
void SetMockCertVerifier(bool return_value) {
auto mock_verifier = std::make_unique<MockCertVerifier>();
EXPECT_CALL(*mock_verifier, Verify(_)).WillRepeatedly(Return(return_value));
cert_verifier_ = std::unique_ptr<webrtc::SSLCertificateVerifier>(
std::move(mock_verifier));
cert_verifier_ =
std::unique_ptr<SSLCertificateVerifier>(std::move(mock_verifier));
SetIgnoreBadCert(false);
SetCertVerifier(cert_verifier_.get());
@ -247,30 +239,30 @@ class SSLAdapterTestBase : public ::testing::Test, public sigslot::has_slots<> {
int rv;
// The initial state is CS_CLOSED
ASSERT_EQ(webrtc::Socket::CS_CLOSED, client_->GetState());
ASSERT_EQ(Socket::CS_CLOSED, client_->GetState());
rv = client_->Connect(server_->GetHostname(), server_->GetAddress());
ASSERT_EQ(0, rv);
// Now the state should be CS_CONNECTING
ASSERT_EQ(webrtc::Socket::CS_CONNECTING, client_->GetState());
ASSERT_EQ(Socket::CS_CONNECTING, client_->GetState());
if (expect_success) {
// If expecting success, the client should end up in the CS_CONNECTED
// state after handshake.
EXPECT_THAT(webrtc::WaitUntil([&] { return client_->GetState(); },
::testing::Eq(webrtc::Socket::CS_CONNECTED),
{.timeout = handshake_wait_}),
webrtc::IsRtcOk());
EXPECT_THAT(WaitUntil([&] { return client_->GetState(); },
::testing::Eq(Socket::CS_CONNECTED),
{.timeout = handshake_wait_}),
IsRtcOk());
RTC_LOG(LS_INFO) << "TLS handshake complete.";
} else {
// On handshake failure the client should end up in the CS_CLOSED state.
EXPECT_THAT(webrtc::WaitUntil([&] { return client_->GetState(); },
::testing::Eq(webrtc::Socket::CS_CLOSED),
{.timeout = handshake_wait_}),
webrtc::IsRtcOk());
EXPECT_THAT(WaitUntil([&] { return client_->GetState(); },
::testing::Eq(Socket::CS_CLOSED),
{.timeout = handshake_wait_}),
IsRtcOk());
RTC_LOG(LS_INFO) << "TLS handshake failed.";
}
@ -283,41 +275,39 @@ class SSLAdapterTestBase : public ::testing::Test, public sigslot::has_slots<> {
ASSERT_EQ(static_cast<int>(message.length()), rv);
// The server should have received the client's message.
EXPECT_THAT(
webrtc::WaitUntil([&] { return server_->GetReceivedData(); },
EXPECT_THAT(WaitUntil([&] { return server_->GetReceivedData(); },
::testing::Eq(message), {.timeout = kTimeout}),
webrtc::IsRtcOk());
IsRtcOk());
rv = server_->Send(message);
ASSERT_EQ(static_cast<int>(message.length()), rv);
// The client should have received the server's message.
EXPECT_THAT(
webrtc::WaitUntil([&] { return client_->GetReceivedData(); },
EXPECT_THAT(WaitUntil([&] { return client_->GetReceivedData(); },
::testing::Eq(message), {.timeout = kTimeout}),
webrtc::IsRtcOk());
IsRtcOk());
RTC_LOG(LS_INFO) << "Transfer complete.";
}
protected:
std::unique_ptr<webrtc::VirtualSocketServer> vss_;
webrtc::AutoSocketServerThread thread_;
std::unique_ptr<VirtualSocketServer> vss_;
AutoSocketServerThread thread_;
std::unique_ptr<SSLAdapterTestDummyServer> server_;
std::unique_ptr<SSLAdapterTestDummyClient> client_;
std::unique_ptr<webrtc::SSLCertificateVerifier> cert_verifier_;
std::unique_ptr<SSLCertificateVerifier> cert_verifier_;
webrtc::TimeDelta handshake_wait_;
TimeDelta handshake_wait_;
};
class SSLAdapterTestTLS_RSA : public SSLAdapterTestBase {
public:
SSLAdapterTestTLS_RSA() : SSLAdapterTestBase(webrtc::KeyParams::RSA()) {}
SSLAdapterTestTLS_RSA() : SSLAdapterTestBase(KeyParams::RSA()) {}
};
class SSLAdapterTestTLS_ECDSA : public SSLAdapterTestBase {
public:
SSLAdapterTestTLS_ECDSA() : SSLAdapterTestBase(webrtc::KeyParams::ECDSA()) {}
SSLAdapterTestTLS_ECDSA() : SSLAdapterTestBase(KeyParams::ECDSA()) {}
};
// Test that handshake works, using RSA
@ -399,18 +389,18 @@ TEST_F(SSLAdapterTestTLS_RSA, TestTLSTransferWithBlockedSocket) {
// Unblock the underlying socket. All of the buffered messages should be sent
// without any further action.
vss_->SetSendingBlocked(false);
EXPECT_THAT(webrtc::WaitUntil([&] { return server_->GetReceivedData(); },
::testing::Eq(expected), {.timeout = kTimeout}),
webrtc::IsRtcOk());
EXPECT_THAT(WaitUntil([&] { return server_->GetReceivedData(); },
::testing::Eq(expected), {.timeout = kTimeout}),
IsRtcOk());
// Send another message. This previously wasn't working
std::string final_message = "Fin.";
expected += final_message;
EXPECT_EQ(static_cast<int>(final_message.size()),
client_->Send(final_message));
EXPECT_THAT(webrtc::WaitUntil([&] { return server_->GetReceivedData(); },
::testing::Eq(expected), {.timeout = kTimeout}),
webrtc::IsRtcOk());
EXPECT_THAT(WaitUntil([&] { return server_->GetReceivedData(); },
::testing::Eq(expected), {.timeout = kTimeout}),
IsRtcOk());
}
// Test transfer between client and server, using ECDSA
@ -442,3 +432,6 @@ TEST_F(SSLAdapterTestTLS_ECDSA, TestTLSEllipticCurves) {
TestHandshake(true);
TestTransfer("Hello, world!");
}
} // namespace
} // namespace webrtc

View File

@ -11,6 +11,7 @@
#include "rtc_base/test_echo_server.h"
#include "api/environment/environment.h"
#include "rtc_base/socket.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/socket_server.h"
@ -24,7 +25,8 @@ TestEchoServer::TestEchoServer(const Environment& env,
thread->socketserver()->Create(addr.family(), SOCK_STREAM)) {
server_socket_->Bind(addr);
server_socket_->Listen(5);
server_socket_->SignalReadEvent.connect(this, &TestEchoServer::OnAccept);
server_socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnAccept(socket); });
}
TestEchoServer::~TestEchoServer() = default;

View File

@ -45,8 +45,10 @@ class StreamSink : public sigslot::has_slots<> {
void Monitor(Socket* socket) {
socket->SubscribeConnectEvent(
this, [this](Socket* socket) { OnConnectEvent(socket); });
socket->SignalReadEvent.connect(this, &StreamSink::OnReadEvent);
socket->SignalWriteEvent.connect(this, &StreamSink::OnWriteEvent);
socket->SubscribeReadEvent(this,
[this](Socket* socket) { OnReadEvent(socket); });
socket->SubscribeWriteEvent(
this, [this](Socket* socket) { OnWriteEvent(socket); });
socket->SubscribeCloseEvent(this, [this](Socket* socket, int error) {
OnCloseEvent(socket, error);
});
@ -55,8 +57,8 @@ class StreamSink : public sigslot::has_slots<> {
}
void Unmonitor(Socket* socket) {
socket->UnsubscribeConnectEvent(this);
socket->SignalReadEvent.disconnect(this);
socket->SignalWriteEvent.disconnect(this);
socket->UnsubscribeReadEvent(this);
socket->UnsubscribeWriteEvent(this);
socket->UnsubscribeCloseEvent(this);
events_.erase(socket);

View File

@ -220,7 +220,7 @@ void VirtualSocket::SafetyBlock::MaybeSignalReadEvent() {
return;
}
}
socket_.SignalReadEvent(&socket_);
socket_.NotifyReadEvent(&socket_);
}
int VirtualSocket::Close() {
@ -439,7 +439,7 @@ void VirtualSocket::PostPacket(TimeDelta delay,
[safety = std::move(safety), socket,
packet = std::move(packet)]() mutable {
if (safety->AddPacket(std::move(packet))) {
socket->SignalReadEvent(socket);
socket->NotifyReadEvent(socket);
}
},
delay);
@ -477,7 +477,7 @@ void VirtualSocket::SafetyBlock::PostConnect(TimeDelta delay,
case Signal::kNone:
break;
case Signal::kReadEvent:
safety->socket_.SignalReadEvent(&safety->socket_);
safety->socket_.NotifyReadEvent(&safety->socket_);
break;
case Signal::kConnectEvent:
safety->socket_.NotifyConnectEvent(&safety->socket_);
@ -618,7 +618,7 @@ void VirtualSocket::OnSocketServerReadyToSend() {
}
if (type_ == SOCK_DGRAM) {
ready_to_send_ = true;
SignalWriteEvent(this);
NotifyWriteEvent(this);
} else {
RTC_DCHECK(type_ == SOCK_STREAM);
// This will attempt to empty the full send buffer, and will fire
@ -650,7 +650,7 @@ void VirtualSocket::UpdateSend(size_t data_size) {
void VirtualSocket::MaybeSignalWriteEvent(size_t capacity) {
if (!ready_to_send_ && (send_buffer_.size() < capacity)) {
ready_to_send_ = true;
SignalWriteEvent(this);
NotifyWriteEvent(this);
}
}

View File

@ -121,7 +121,7 @@ void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) {
// where pending_packet will be read and reset. This call is done without
// any thread switch (see AsyncUDPSocket::OnReadEvent) so it's safe to
// assume that SignalReadEvent() will block until the packet has been read.
SignalReadEvent(this);
NotifyReadEvent(this);
RTC_DCHECK(!pending_);
};
thread_->PostTask(SafeTask(alive_, std::move(task)));

View File

@ -66,7 +66,8 @@ class SocketReader : public sigslot::has_slots<> {
public:
explicit SocketReader(Socket* socket, Thread* network_thread)
: socket_(socket), network_thread_(network_thread) {
socket_->SignalReadEvent.connect(this, &SocketReader::OnReadEvent);
socket_->SubscribeReadEvent(
this, [this](Socket* socket) { OnReadEvent(socket); });
}
void OnReadEvent(Socket* socket) {