Replace a few cases of Thread* const with TaskQueueBase* const

Update member variables and constructor arguments in a few classes,
including `RtpSender`, `JsepTransport`, and `FifoBuffer`, to use
`TaskQueueBase*` instead of `Thread*`.

This change narrows the interface exposed to these classes. Since
`Thread` inherits from `TaskQueueBase`, using the base class restricts
access to `Thread`-specific methods like `BlockingCall()`. This helps
enforce asynchronous patterns, isolates where blocking calls are
currently permitted, and prevents the introduction of new synchronous
calls in these contexts.

Update `BUILD.gn` files to replace dependencies on `rtc_base:threading`
with `api/task_queue` where appropriate.

Bug: none
Change-Id: I6d666066eaeb1efb45bf4a7aec12ea448c21baa2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/425120
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#46176}
This commit is contained in:
Tommi 2025-11-12 19:20:56 +01:00 committed by WebRTC LUCI CQ
parent 0ff9ab2bc6
commit dfd37987a0
21 changed files with 60 additions and 42 deletions

View File

@ -17,6 +17,7 @@
#include "api/frame_transformer_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "modules/rtp_rtcp/source/frame_object.h"
#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/thread.h"
@ -74,7 +75,7 @@ class RtpVideoStreamReceiverFrameTransformerDelegate
RtpVideoFrameReceiver* receiver_ RTC_GUARDED_BY(network_sequence_checker_);
scoped_refptr<FrameTransformerInterface> frame_transformer_
RTC_GUARDED_BY(network_sequence_checker_);
Thread* const network_thread_;
TaskQueueBase* const network_thread_;
const uint32_t ssrc_;
Clock* const clock_;
bool short_circuit_ RTC_GUARDED_BY(network_sequence_checker_) = false;

View File

@ -508,6 +508,7 @@ rtc_library("p2p_transport_channel") {
"../api:rtc_error",
"../api:sequence_checker",
"../api/environment",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/transport:enums",
"../api/transport:stun_types",
@ -696,11 +697,11 @@ rtc_library("regathering_controller") {
":packet_transport_internal",
":port_allocator",
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/units:time_delta",
"../rtc_base:checks",
"../rtc_base:network_route",
"../rtc_base:threading",
"../rtc_base/third_party/sigslot",
]
}
@ -1002,6 +1003,7 @@ rtc_library("wrapping_active_ice_controller") {
":ice_transport_internal",
":transport_description",
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/units:time_delta",
"../rtc_base:checks",
@ -1034,6 +1036,7 @@ if (rtc_include_tests) {
"../api:ice_transport_interface",
"../api:libjingle_peerconnection_api",
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/transport:enums",
"../api/transport:stun_types",
@ -1047,7 +1050,6 @@ if (rtc_include_tests) {
"../rtc_base:network_route",
"../rtc_base:socket",
"../rtc_base:task_queue_for_test",
"../rtc_base:threading",
"../rtc_base:timeutils",
"../rtc_base/network:received_packet",
"../rtc_base/network:sent_packet",

View File

@ -38,6 +38,7 @@
#include "api/local_network_access_permission.h"
#include "api/rtc_error.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/enums.h"
#include "api/transport/stun.h"
#include "api/units/time_delta.h"
@ -72,7 +73,6 @@
#include "rtc_base/socket_address.h"
#include "rtc_base/strings/string_builder.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -444,7 +444,7 @@ class RTC_EXPORT P2PTransportChannel : public IceTransportInternal,
owned_dns_resolver_factory_;
LocalNetworkAccessPermissionFactoryInterface* const lna_permission_factory_
RTC_GUARDED_BY(network_thread_);
Thread* const network_thread_;
TaskQueueBase* const network_thread_;
bool incoming_only_ RTC_GUARDED_BY(network_thread_);
int error_ RTC_GUARDED_BY(network_thread_);
std::vector<std::unique_ptr<PortAllocatorSession>> allocator_sessions_

View File

@ -10,19 +10,23 @@
#include "p2p/base/regathering_controller.h"
#include <optional>
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "p2p/base/ice_transport_internal.h"
#include "p2p/base/packet_transport_internal.h"
#include "rtc_base/checks.h"
#include "rtc_base/thread.h"
#include "rtc_base/network_route.h"
namespace webrtc {
BasicRegatheringController::BasicRegatheringController(
const Config& config,
IceTransportInternal* ice_transport,
Thread* thread)
TaskQueueBase* thread)
: config_(config), ice_transport_(ice_transport), thread_(thread) {
RTC_DCHECK(thread_);
RTC_DCHECK_RUN_ON(thread_);

View File

@ -15,12 +15,12 @@
#include <optional>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "p2p/base/ice_transport_internal.h"
#include "p2p/base/p2p_constants.h"
#include "p2p/base/packet_transport_internal.h"
#include "p2p/base/port_allocator.h"
#include "rtc_base/network_route.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -55,7 +55,7 @@ class BasicRegatheringController {
BasicRegatheringController() = delete;
BasicRegatheringController(const Config& config,
IceTransportInternal* ice_transport,
Thread* thread);
TaskQueueBase* thread);
virtual ~BasicRegatheringController();
// TODO(qingsi): Remove this method after implementing a new signal in
// P2PTransportChannel and reacting to that signal for the initial schedules
@ -93,7 +93,7 @@ class BasicRegatheringController {
Config config_;
IceTransportInternal* ice_transport_;
PortAllocatorSession* allocator_session_ = nullptr;
Thread* const thread_;
TaskQueueBase* const thread_;
};
} // namespace webrtc

View File

@ -16,6 +16,7 @@
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "p2p/base/basic_ice_controller.h"
#include "p2p/base/connection.h"
@ -39,7 +40,7 @@ namespace webrtc {
WrappingActiveIceController::WrappingActiveIceController(
IceAgentInterface* ice_agent,
std::unique_ptr<IceControllerInterface> wrapped)
: network_thread_(Thread::Current()),
: network_thread_(TaskQueueBase::Current()),
wrapped_(std::move(wrapped)),
agent_(*ice_agent) {
RTC_DCHECK(ice_agent != nullptr);

View File

@ -14,6 +14,7 @@
#include <memory>
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "p2p/base/active_ice_controller_interface.h"
#include "p2p/base/connection.h"
#include "p2p/base/ice_agent_interface.h"
@ -22,7 +23,6 @@
#include "p2p/base/ice_switch_reason.h"
#include "p2p/base/ice_transport_internal.h"
#include "p2p/base/transport_description.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -78,7 +78,7 @@ class WrappingActiveIceController : public ActiveIceControllerInterface {
void PruneConnections();
Thread* const network_thread_;
TaskQueueBase* const network_thread_;
ScopedTaskSafety task_safety_;
bool started_pinging_ RTC_GUARDED_BY(network_thread_) = false;

View File

@ -28,6 +28,7 @@
#include "api/ice_transport_interface.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/enums.h"
#include "api/transport/stun.h"
#include "api/units/time_delta.h"
@ -47,7 +48,6 @@
#include "rtc_base/network_route.h"
#include "rtc_base/socket.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h"
#include "test/create_test_field_trials.h"
@ -63,11 +63,12 @@ class FakeIceTransport : public IceTransportInternal {
public:
explicit FakeIceTransport(absl::string_view name,
int component,
Thread* network_thread = nullptr,
TaskQueueBase* network_thread = nullptr,
absl::string_view field_trials_string = "")
: name_(name),
component_(component),
network_thread_(network_thread ? network_thread : Thread::Current()),
network_thread_(network_thread ? network_thread
: TaskQueueBase::Current()),
field_trials_(CreateTestFieldTrials(field_trials_string)) {
RTC_DCHECK(network_thread_);
}
@ -646,7 +647,7 @@ class FakeIceTransport : public IceTransportInternal {
std::optional<NetworkRoute> network_route_ RTC_GUARDED_BY(network_thread_);
std::map<Socket::Option, int> socket_options_ RTC_GUARDED_BY(network_thread_);
CopyOnWriteBuffer last_sent_packet_ RTC_GUARDED_BY(network_thread_);
Thread* const network_thread_;
TaskQueueBase* const network_thread_;
ScopedTaskSafetyDetached task_safety_;
std::optional<int> rtt_estimate_;
std::optional<int64_t> last_sent_ping_timestamp_;

View File

@ -229,6 +229,7 @@ rtc_library("jsep_transport") {
"../api:rtc_error",
"../api:scoped_refptr",
"../api:sequence_checker",
"../api/task_queue",
"../api/transport:datagram_transport_interface",
"../call:payload_type_picker",
"../media:rtc_data_sctp_transport_internal",
@ -1028,6 +1029,7 @@ rtc_library("rtc_stats_collector") {
"../api/audio:audio_device",
"../api/audio:audio_processing_statistics",
"../api/environment",
"../api/task_queue",
"../api/transport:enums",
"../api/units:time_delta",
"../api/units:timestamp",
@ -1950,6 +1952,7 @@ rtc_library("rtp_sender") {
"../api:sequence_checker",
"../api/crypto:frame_encryptor_interface",
"../api/environment",
"../api/task_queue",
"../api/video_codecs:video_codecs_api",
"../media:audio_source",
"../media:codec",

View File

@ -22,6 +22,7 @@
#include "api/rtc_error.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/transport/data_channel_transport_interface.h"
#include "call/payload_type_picker.h"
#include "media/sctp/sctp_transport_internal.h"
@ -39,7 +40,6 @@
#include "rtc_base/rtc_certificate.h"
#include "rtc_base/ssl_fingerprint.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -273,7 +273,7 @@ class JsepTransport {
TransportStats* stats) const;
// Owning thread, for safety checks
const Thread* const network_thread_;
const TaskQueueBase* const network_thread_;
const std::string mid_;
// needs-ice-restart bit as described in JSEP.
bool needs_ice_restart_ RTC_GUARDED_BY(network_thread_) = false;

View File

@ -29,6 +29,7 @@
#include "api/scoped_refptr.h"
#include "api/stats/rtc_stats_collector_callback.h"
#include "api/stats/rtc_stats_report.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/timestamp.h"
#include "call/call.h"
#include "pc/peer_connection_internal.h"
@ -252,7 +253,7 @@ class RTCStatsCollector : public RefCountInterface {
const bool is_unified_plan_;
const Environment env_;
const bool stats_timestamp_with_environment_clock_;
Thread* const signaling_thread_;
TaskQueueBase* const signaling_thread_;
Thread* const worker_thread_;
Thread* const network_thread_;

View File

@ -35,6 +35,7 @@
#include "api/rtp_sender_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video_codecs/video_encoder_factory.h"
#include "media/base/audio_source.h"
#include "media/base/codec.h"
@ -110,7 +111,7 @@ RtpParameters RestoreEncodingLayers(
class SignalingThreadCallback {
public:
SignalingThreadCallback(Thread* signaling_thread,
SignalingThreadCallback(TaskQueueBase* signaling_thread,
SetParametersCallback callback)
: signaling_thread_(signaling_thread), callback_(std::move(callback)) {}
SignalingThreadCallback(SignalingThreadCallback&& other)
@ -144,7 +145,7 @@ class SignalingThreadCallback {
callback_ = nullptr;
}
Thread* signaling_thread_;
TaskQueueBase* const signaling_thread_;
SetParametersCallback callback_;
};

View File

@ -35,6 +35,7 @@
#include "api/rtp_sender_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video_codecs/video_encoder_factory.h"
#include "media/base/audio_source.h"
#include "media/base/codec.h"
@ -257,7 +258,7 @@ class RtpSenderBase : public RtpSenderInternal, public ObserverInterface {
virtual void RemoveTrackFromStats() {}
const Environment env_;
Thread* const signaling_thread_;
TaskQueueBase* const signaling_thread_;
Thread* const worker_thread_;
uint32_t ssrc_ = 0;
bool stopped_ RTC_GUARDED_BY(signaling_thread_) = false;

View File

@ -1553,8 +1553,8 @@ rtc_library("rtc_certificate_generator") {
deps = [
":checks",
":ssl",
":threading",
"../api:scoped_refptr",
"../api/task_queue",
"system:rtc_export",
"//third_party/abseil-cpp/absl/functional:any_invocable",
]
@ -1721,6 +1721,7 @@ rtc_library("ssl_adapter") {
"../api:array_view",
"../api:field_trials_view",
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/units:time_delta",
"system:rtc_export",

View File

@ -36,9 +36,9 @@ rtc_library("fifo_buffer") {
"..:checks",
"..:macromagic",
"..:stream",
"..:threading",
"../../api:array_view",
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:pending_task_safety_flag",
]
}

View File

@ -17,9 +17,9 @@
#include "api/array_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
#include "rtc_base/stream.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -29,11 +29,11 @@ FifoBuffer::FifoBuffer(size_t size)
buffer_length_(size),
data_length_(0),
read_position_(0),
owner_(Thread::Current()) {
owner_(TaskQueueBase::Current()) {
// all events are done on the owner_ thread
}
FifoBuffer::FifoBuffer(size_t size, Thread* owner)
FifoBuffer::FifoBuffer(size_t size, TaskQueueBase* owner)
: state_(SS_OPEN),
buffer_(new char[size]),
buffer_length_(size),

View File

@ -18,8 +18,8 @@
#include "api/array_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/stream.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -31,7 +31,7 @@ class FifoBuffer final : public StreamInterface {
// Creates a FIFO buffer with the specified capacity.
explicit FifoBuffer(size_t length);
// Creates a FIFO buffer with the specified capacity and owner
FifoBuffer(size_t length, Thread* owner);
FifoBuffer(size_t length, TaskQueueBase* owner);
~FifoBuffer() override;
FifoBuffer(const FifoBuffer&) = delete;
@ -115,7 +115,7 @@ class FifoBuffer final : public StreamInterface {
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(callback_sequence_);
// stream callbacks are dispatched on this thread
Thread* const owner_;
TaskQueueBase* const owner_;
};
} // namespace webrtc

View File

@ -32,6 +32,7 @@
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/buffer.h"
#include "rtc_base/checks.h"
@ -46,7 +47,6 @@
#include "rtc_base/stream.h"
#include "rtc_base/string_encode.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
#ifdef OPENSSL_IS_BORINGSSL
@ -325,7 +325,7 @@ OpenSSLStreamAdapter::OpenSSLStreamAdapter(
const FieldTrialsView* field_trials)
: stream_(std::move(stream)),
handshake_error_(std::move(handshake_error)),
owner_(Thread::Current()),
owner_(TaskQueueBase::Current()),
state_(SSL_NONE),
role_(SSL_CLIENT),
ssl_read_needs_write_(false),

View File

@ -33,11 +33,11 @@
#endif
#include "api/field_trials_view.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/ssl_identity.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -215,7 +215,7 @@ class OpenSSLStreamAdapter final : public SSLStreamAdapter {
const std::unique_ptr<StreamInterface> stream_;
absl::AnyInvocable<void(SSLHandshakeError)> handshake_error_;
Thread* const owner_;
TaskQueueBase* const owner_;
ScopedTaskSafety task_safety_;
RepeatingTaskHandle timeout_task_;

View File

@ -18,10 +18,10 @@
#include <utility>
#include "api/scoped_refptr.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
#include "rtc_base/rtc_certificate.h"
#include "rtc_base/ssl_identity.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -63,8 +63,9 @@ scoped_refptr<RTCCertificate> RTCCertificateGenerator::GenerateCertificate(
return RTCCertificate::Create(std::move(identity));
}
RTCCertificateGenerator::RTCCertificateGenerator(Thread* signaling_thread,
Thread* worker_thread)
RTCCertificateGenerator::RTCCertificateGenerator(
TaskQueueBase* signaling_thread,
TaskQueueBase* worker_thread)
: signaling_thread_(signaling_thread), worker_thread_(worker_thread) {
RTC_DCHECK(signaling_thread_);
RTC_DCHECK(worker_thread_);

View File

@ -17,10 +17,10 @@
#include "absl/functional/any_invocable.h"
#include "api/scoped_refptr.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/rtc_certificate.h"
#include "rtc_base/ssl_identity.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -61,7 +61,8 @@ class RTC_EXPORT RTCCertificateGenerator
const KeyParams& key_params,
const std::optional<uint64_t>& expires_ms);
RTCCertificateGenerator(Thread* signaling_thread, Thread* worker_thread);
RTCCertificateGenerator(TaskQueueBase* signaling_thread,
TaskQueueBase* worker_thread);
~RTCCertificateGenerator() override {}
// `RTCCertificateGeneratorInterface` overrides.
@ -74,8 +75,8 @@ class RTC_EXPORT RTCCertificateGenerator
Callback callback) override;
private:
Thread* const signaling_thread_;
Thread* const worker_thread_;
TaskQueueBase* const signaling_thread_;
TaskQueueBase* const worker_thread_;
};
} // namespace webrtc