Compare commits
3 Commits
master
...
eliza/trac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd0b32d38f | ||
|
|
b65ed09f63 | ||
|
|
de27ab47ec |
@ -24,9 +24,11 @@ exclude = [
|
||||
publish = false
|
||||
|
||||
[features]
|
||||
default = ["log"]
|
||||
log = ["tracing/log"]
|
||||
|
||||
[dependencies]
|
||||
log = "0.4.6"
|
||||
tracing = "0.1.6"
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
libc = "0.2.58"
|
||||
@ -42,6 +44,7 @@ lazy_static = "1.3.0"
|
||||
# for testing we'll use the git version.
|
||||
bytes = { version = "0.5.0", git = "https://github.com/tokio-rs/bytes", rev = "79e4b2847f27137faaf149d116a352cbeae47fd1" }
|
||||
env_logger = { version = "0.6.1", default-features = false }
|
||||
tracing = { version = "0.1.6", features = ["log"] }
|
||||
slab = "0.4.2"
|
||||
tempdir = "0.3.7"
|
||||
net2 = "0.2.33"
|
||||
|
||||
@ -77,10 +77,8 @@ platform, submit a PR to update the list!
|
||||
|
||||
## Community
|
||||
|
||||
A group of Mio users hang out on [Gitter], this can be a good place to go for
|
||||
questions.
|
||||
|
||||
[Gitter]: https://gitter.im/tokio-rs/mio
|
||||
A group of Mio users hang out in the #mio channel on the Mozilla IRC
|
||||
server (irc.mozilla.org). This can be a good place to go for questions.
|
||||
|
||||
## Contributing
|
||||
|
||||
|
||||
@ -14,13 +14,6 @@ jobs:
|
||||
displayName: Test
|
||||
cross: true
|
||||
|
||||
# Stable --release
|
||||
- template: ci/azure-test-stable.yml
|
||||
parameters:
|
||||
name: stable_release
|
||||
displayName: Test --release
|
||||
cmd: test --release
|
||||
|
||||
# Nightly
|
||||
- template: ci/azure-test-stable.yml
|
||||
parameters:
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
use crate::{Interests, Registry, Token};
|
||||
|
||||
use std::io;
|
||||
use std::ops::Deref;
|
||||
|
||||
|
||||
@ -7,11 +7,22 @@
|
||||
//!
|
||||
//! [portability guidelines]: ../struct.Poll.html#portability
|
||||
|
||||
mod tcp_listener;
|
||||
pub use self::tcp_listener::TcpListener;
|
||||
|
||||
mod tcp_stream;
|
||||
pub use self::tcp_stream::TcpStream;
|
||||
|
||||
mod tcp;
|
||||
mod udp;
|
||||
|
||||
pub use self::tcp::{TcpListener, TcpStream};
|
||||
pub use self::udp::UdpSocket;
|
||||
|
||||
#[test]
|
||||
#[cfg(not(debug_assertions))]
|
||||
fn assert_size() {
|
||||
use std::mem::size_of;
|
||||
|
||||
use crate::sys;
|
||||
|
||||
// Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the
|
||||
// same size as the system specific socket, i.e. just a file descriptor on Unix platforms.
|
||||
assert_eq!(size_of::<TcpListener>(), size_of::<sys::TcpListener>());
|
||||
assert_eq!(size_of::<TcpStream>(), size_of::<sys::TcpStream>());
|
||||
assert_eq!(size_of::<UdpSocket>(), size_of::<sys::UdpSocket>());
|
||||
}
|
||||
|
||||
@ -1,13 +1,26 @@
|
||||
//! Primitives for working with TCP
|
||||
//!
|
||||
//! The types provided in this module are non-blocking by default and are
|
||||
//! designed to be portable across all supported Mio platforms. As long as the
|
||||
//! [portability guidelines] are followed, the behavior should be identical no
|
||||
//! matter the target platform.
|
||||
//!
|
||||
//! [portability guidelines]: ../struct.Poll.html#portability
|
||||
|
||||
use std::fmt;
|
||||
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
||||
use std::net::SocketAddr;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
use crate::poll::SelectorId;
|
||||
use crate::{event, sys, Interests, Registry, Token};
|
||||
|
||||
/*
|
||||
*
|
||||
* ===== TcpStream =====
|
||||
*
|
||||
*/
|
||||
|
||||
/// A non-blocking TCP stream between a local socket and a remote socket.
|
||||
///
|
||||
/// The socket will be closed when the value is dropped.
|
||||
@ -47,14 +60,6 @@ pub struct TcpStream {
|
||||
use std::net::Shutdown;
|
||||
|
||||
impl TcpStream {
|
||||
pub(crate) fn new(sys: sys::TcpStream) -> TcpStream {
|
||||
TcpStream {
|
||||
sys,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new TCP stream and issue a non-blocking connect to the
|
||||
/// specified address.
|
||||
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
|
||||
@ -230,6 +235,163 @@ impl fmt::Debug for TcpStream {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
* ===== TcpListener =====
|
||||
*
|
||||
*/
|
||||
|
||||
/// A structure representing a socket server
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use std::error::Error;
|
||||
/// # fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// use mio::{Events, Interests, Poll, Token};
|
||||
/// use mio::net::TcpListener;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let listener = TcpListener::bind("127.0.0.1:34255".parse()?)?;
|
||||
///
|
||||
/// let mut poll = Poll::new()?;
|
||||
/// let mut events = Events::with_capacity(128);
|
||||
///
|
||||
/// // Register the socket with `Poll`
|
||||
/// poll.registry().register(&listener, Token(0), Interests::READABLE)?;
|
||||
///
|
||||
/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
|
||||
///
|
||||
/// // There may be a socket ready to be accepted
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub struct TcpListener {
|
||||
sys: sys::TcpListener,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId,
|
||||
}
|
||||
|
||||
impl TcpListener {
|
||||
/// Convenience method to bind a new TCP listener to the specified address
|
||||
/// to receive new connections.
|
||||
///
|
||||
/// This function will take the following steps:
|
||||
///
|
||||
/// 1. Create a new TCP socket.
|
||||
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
|
||||
/// 3. Bind the socket to the specified address.
|
||||
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
|
||||
sys::TcpListener::bind(addr).map(|sys| TcpListener {
|
||||
sys,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Accepts a new `TcpStream`.
|
||||
///
|
||||
/// This may return an `Err(e)` where `e.kind()` is
|
||||
/// `io::ErrorKind::WouldBlock`. This means a stream may be ready at a later
|
||||
/// point and one should wait for an event before calling `accept` again.
|
||||
///
|
||||
/// If an accepted stream is returned, the remote address of the peer is
|
||||
/// returned along with it.
|
||||
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||
self.sys.accept().map(|(sys, addr)| {
|
||||
(
|
||||
TcpStream {
|
||||
sys,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
},
|
||||
addr,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the local socket address of this listener.
|
||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.sys.local_addr()
|
||||
}
|
||||
|
||||
/// Creates a new independently owned handle to the underlying socket.
|
||||
///
|
||||
/// The returned `TcpListener` is a reference to the same socket that this
|
||||
/// object references. Both handles can be used to accept incoming
|
||||
/// connections and options set on one listener will affect the other.
|
||||
pub fn try_clone(&self) -> io::Result<TcpListener> {
|
||||
self.sys.try_clone().map(|s| TcpListener {
|
||||
sys: s,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: self.selector_id.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the value for the `IP_TTL` option on this socket.
|
||||
///
|
||||
/// This value sets the time-to-live field that is used in every packet sent
|
||||
/// from this socket.
|
||||
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
|
||||
self.sys.set_ttl(ttl)
|
||||
}
|
||||
|
||||
/// Gets the value of the `IP_TTL` option for this socket.
|
||||
///
|
||||
/// For more information about this option, see [`set_ttl`][link].
|
||||
///
|
||||
/// [link]: #method.set_ttl
|
||||
pub fn ttl(&self) -> io::Result<u32> {
|
||||
self.sys.ttl()
|
||||
}
|
||||
|
||||
/// Get the value of the `SO_ERROR` option on this socket.
|
||||
///
|
||||
/// This will retrieve the stored error in the underlying socket, clearing
|
||||
/// the field in the process. This can be useful for checking errors between
|
||||
/// calls.
|
||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||
self.sys.take_error()
|
||||
}
|
||||
}
|
||||
|
||||
impl event::Source for TcpListener {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
#[cfg(debug_assertions)]
|
||||
self.selector_id.associate_selector(registry)?;
|
||||
self.sys.register(registry, token, interests)
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
) -> io::Result<()> {
|
||||
self.sys.reregister(registry, token, interests)
|
||||
}
|
||||
|
||||
fn deregister(&self, registry: &Registry) -> io::Result<()> {
|
||||
self.sys.deregister(registry)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.sys, f)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
* ===== UNIX ext =====
|
||||
*
|
||||
*/
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
#[cfg(unix)]
|
||||
impl IntoRawFd for TcpStream {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
@ -254,3 +416,28 @@ impl FromRawFd for TcpStream {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl IntoRawFd for TcpListener {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
self.sys.into_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl AsRawFd for TcpListener {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.sys.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl FromRawFd for TcpListener {
|
||||
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
|
||||
TcpListener {
|
||||
sys: FromRawFd::from_raw_fd(fd),
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,170 +0,0 @@
|
||||
use super::TcpStream;
|
||||
#[cfg(debug_assertions)]
|
||||
use crate::poll::SelectorId;
|
||||
use crate::{event, sys, Interests, Registry, Token};
|
||||
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
/// A structure representing a socket server
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use std::error::Error;
|
||||
/// # fn main() -> Result<(), Box<dyn Error>> {
|
||||
/// use mio::{Events, Interests, Poll, Token};
|
||||
/// use mio::net::TcpListener;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let listener = TcpListener::bind("127.0.0.1:34255".parse()?)?;
|
||||
///
|
||||
/// let mut poll = Poll::new()?;
|
||||
/// let mut events = Events::with_capacity(128);
|
||||
///
|
||||
/// // Register the socket with `Poll`
|
||||
/// poll.registry().register(&listener, Token(0), Interests::READABLE)?;
|
||||
///
|
||||
/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
|
||||
///
|
||||
/// // There may be a socket ready to be accepted
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub struct TcpListener {
|
||||
sys: sys::TcpListener,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId,
|
||||
}
|
||||
|
||||
impl TcpListener {
|
||||
/// Convenience method to bind a new TCP listener to the specified address
|
||||
/// to receive new connections.
|
||||
///
|
||||
/// This function will take the following steps:
|
||||
///
|
||||
/// 1. Create a new TCP socket.
|
||||
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
|
||||
/// 3. Bind the socket to the specified address.
|
||||
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
|
||||
sys::TcpListener::bind(addr).map(|sys| TcpListener {
|
||||
sys,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Accepts a new `TcpStream`.
|
||||
///
|
||||
/// This may return an `Err(e)` where `e.kind()` is
|
||||
/// `io::ErrorKind::WouldBlock`. This means a stream may be ready at a later
|
||||
/// point and one should wait for an event before calling `accept` again.
|
||||
///
|
||||
/// If an accepted stream is returned, the remote address of the peer is
|
||||
/// returned along with it.
|
||||
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||
self.sys
|
||||
.accept()
|
||||
.map(|(sys, addr)| (TcpStream::new(sys), addr))
|
||||
}
|
||||
|
||||
/// Returns the local socket address of this listener.
|
||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.sys.local_addr()
|
||||
}
|
||||
|
||||
/// Creates a new independently owned handle to the underlying socket.
|
||||
///
|
||||
/// The returned `TcpListener` is a reference to the same socket that this
|
||||
/// object references. Both handles can be used to accept incoming
|
||||
/// connections and options set on one listener will affect the other.
|
||||
pub fn try_clone(&self) -> io::Result<TcpListener> {
|
||||
self.sys.try_clone().map(|s| TcpListener {
|
||||
sys: s,
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: self.selector_id.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the value for the `IP_TTL` option on this socket.
|
||||
///
|
||||
/// This value sets the time-to-live field that is used in every packet sent
|
||||
/// from this socket.
|
||||
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
|
||||
self.sys.set_ttl(ttl)
|
||||
}
|
||||
|
||||
/// Gets the value of the `IP_TTL` option for this socket.
|
||||
///
|
||||
/// For more information about this option, see [`set_ttl`][link].
|
||||
///
|
||||
/// [link]: #method.set_ttl
|
||||
pub fn ttl(&self) -> io::Result<u32> {
|
||||
self.sys.ttl()
|
||||
}
|
||||
|
||||
/// Get the value of the `SO_ERROR` option on this socket.
|
||||
///
|
||||
/// This will retrieve the stored error in the underlying socket, clearing
|
||||
/// the field in the process. This can be useful for checking errors between
|
||||
/// calls.
|
||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||
self.sys.take_error()
|
||||
}
|
||||
}
|
||||
|
||||
impl event::Source for TcpListener {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
#[cfg(debug_assertions)]
|
||||
self.selector_id.associate_selector(registry)?;
|
||||
self.sys.register(registry, token, interests)
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
) -> io::Result<()> {
|
||||
self.sys.reregister(registry, token, interests)
|
||||
}
|
||||
|
||||
fn deregister(&self, registry: &Registry) -> io::Result<()> {
|
||||
self.sys.deregister(registry)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.sys, f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl IntoRawFd for TcpListener {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
self.sys.into_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl AsRawFd for TcpListener {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.sys.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl FromRawFd for TcpListener {
|
||||
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
|
||||
TcpListener {
|
||||
sys: FromRawFd::from_raw_fd(fd),
|
||||
#[cfg(debug_assertions)]
|
||||
selector_id: SelectorId::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -14,8 +14,6 @@ use crate::{event, sys, Interests, Registry, Token};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
/// A User Datagram Protocol socket.
|
||||
///
|
||||
@ -534,6 +532,9 @@ impl fmt::Debug for UdpSocket {
|
||||
*
|
||||
*/
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
#[cfg(unix)]
|
||||
impl IntoRawFd for UdpSocket {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
|
||||
19
src/poll.rs
19
src/poll.rs
@ -1,6 +1,3 @@
|
||||
use crate::{event, sys, Events, Interests, Token};
|
||||
|
||||
use log::trace;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
#[cfg(debug_assertions)]
|
||||
@ -8,6 +5,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::{fmt, io};
|
||||
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{event, sys, Events, Interests, Token};
|
||||
|
||||
/// Polls for readiness events on all registered values.
|
||||
///
|
||||
/// `Poll` allows a program to monitor a large number of [`event::Source`]s,
|
||||
@ -491,9 +492,9 @@ impl Registry {
|
||||
S: event::Source + ?Sized,
|
||||
{
|
||||
trace!(
|
||||
"registering event source with poller: token={:?}, interests={:?}",
|
||||
token,
|
||||
interests
|
||||
message = "registering event source with poller",
|
||||
?token,
|
||||
?interests
|
||||
);
|
||||
source.register(self, token, interests)
|
||||
}
|
||||
@ -557,9 +558,9 @@ impl Registry {
|
||||
S: event::Source + ?Sized,
|
||||
{
|
||||
trace!(
|
||||
"reregistering event source with poller: token={:?}, interests={:?}",
|
||||
token,
|
||||
interests
|
||||
message = "reregistering event source with poller",
|
||||
?token,
|
||||
?interests
|
||||
);
|
||||
source.reregister(self, token, interests)
|
||||
}
|
||||
|
||||
@ -1,14 +1,15 @@
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use libc::{EPOLLET, EPOLLIN, EPOLLOUT};
|
||||
use log::error;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
#[cfg(debug_assertions)]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::{cmp, i32, io, ptr};
|
||||
|
||||
use libc::{EPOLLET, EPOLLIN, EPOLLOUT};
|
||||
use tracing::error;
|
||||
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
/// Unique id for use as `SelectorId`.
|
||||
#[cfg(debug_assertions)]
|
||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
|
||||
@ -110,8 +111,8 @@ impl AsRawFd for Selector {
|
||||
|
||||
impl Drop for Selector {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = syscall!(close(self.ep)) {
|
||||
error!("error closing epoll: {}", err);
|
||||
if let Err(error) = syscall!(close(self.ep)) {
|
||||
error!(message = "error closing epoll", %error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,3 @@
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use log::error;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
#[cfg(debug_assertions)]
|
||||
@ -9,6 +5,11 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::time::Duration;
|
||||
use std::{cmp, io, ptr, slice};
|
||||
|
||||
use tracing::error;
|
||||
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
/// Unique id for use as `SelectorId`.
|
||||
#[cfg(debug_assertions)]
|
||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
|
||||
@ -226,6 +227,16 @@ impl Selector {
|
||||
})
|
||||
}
|
||||
|
||||
// Used by `Waker`.
|
||||
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
|
||||
pub fn try_clone_waker(&self) -> io::Result<Selector> {
|
||||
syscall!(dup(self.kq)).map(|new_kq| Selector {
|
||||
#[cfg(debug_assertions)]
|
||||
id: self.id,
|
||||
kq: new_kq,
|
||||
})
|
||||
}
|
||||
|
||||
// Used by `Waker`.
|
||||
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
|
||||
pub fn wake(&self, token: Token) -> io::Result<()> {
|
||||
@ -298,8 +309,8 @@ impl AsRawFd for Selector {
|
||||
|
||||
impl Drop for Selector {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = syscall!(close(self.kq)) {
|
||||
error!("error closing kqueue: {}", err);
|
||||
if let Err(error) = syscall!(close(self.kq)) {
|
||||
error!(message = "error closing kqueue", %error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,17 +40,72 @@ mod kqueue;
|
||||
))]
|
||||
pub use self::kqueue::{event, Event, Selector};
|
||||
|
||||
mod net;
|
||||
mod sourcefd;
|
||||
mod tcp_listener;
|
||||
mod tcp_stream;
|
||||
mod tcp;
|
||||
mod udp;
|
||||
mod waker;
|
||||
|
||||
pub use self::sourcefd::SourceFd;
|
||||
pub use self::tcp_listener::TcpListener;
|
||||
pub use self::tcp_stream::TcpStream;
|
||||
pub use self::tcp::{TcpListener, TcpStream};
|
||||
pub use self::udp::UdpSocket;
|
||||
pub use self::waker::Waker;
|
||||
|
||||
pub type Events = Vec<Event>;
|
||||
|
||||
pub mod net {
|
||||
use std::io;
|
||||
use std::mem::size_of_val;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
/// Create a new non-blocking socket.
|
||||
pub fn new_socket(addr: SocketAddr, socket_type: libc::c_int) -> io::Result<libc::c_int> {
|
||||
let domain = match addr {
|
||||
SocketAddr::V4(..) => libc::AF_INET,
|
||||
SocketAddr::V6(..) => libc::AF_INET6,
|
||||
};
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "android",
|
||||
target_os = "bitrig",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "linux",
|
||||
target_os = "netbsd",
|
||||
target_os = "openbsd"
|
||||
))]
|
||||
let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
|
||||
|
||||
// Gives a warning for platforms without SOCK_NONBLOCK.
|
||||
#[allow(clippy::let_and_return)]
|
||||
let socket = syscall!(socket(domain, socket_type, 0));
|
||||
|
||||
// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
|
||||
// Solaris, couldn't find anything online.
|
||||
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
|
||||
let socket = socket.and_then(|socket| {
|
||||
// For platforms that don't support flags in socket, we need to
|
||||
// set the flags ourselves.
|
||||
syscall!(fcntl(
|
||||
socket,
|
||||
libc::F_SETFL,
|
||||
libc::O_NONBLOCK | libc::O_CLOEXEC
|
||||
))
|
||||
.map(|_| socket)
|
||||
});
|
||||
|
||||
socket
|
||||
}
|
||||
|
||||
pub fn socket_addr(addr: &SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
|
||||
match addr {
|
||||
SocketAddr::V4(ref addr) => (
|
||||
addr as *const _ as *const libc::sockaddr,
|
||||
size_of_val(addr) as libc::socklen_t,
|
||||
),
|
||||
SocketAddr::V6(ref addr) => (
|
||||
addr as *const _ as *const libc::sockaddr,
|
||||
size_of_val(addr) as libc::socklen_t,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,55 +0,0 @@
|
||||
use std::io;
|
||||
use std::mem::size_of_val;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
/// Create a new non-blocking socket.
|
||||
pub fn new_socket(addr: SocketAddr, socket_type: libc::c_int) -> io::Result<libc::c_int> {
|
||||
let domain = match addr {
|
||||
SocketAddr::V4(..) => libc::AF_INET,
|
||||
SocketAddr::V6(..) => libc::AF_INET6,
|
||||
};
|
||||
|
||||
#[cfg(any(
|
||||
target_os = "android",
|
||||
target_os = "bitrig",
|
||||
target_os = "dragonfly",
|
||||
target_os = "freebsd",
|
||||
target_os = "linux",
|
||||
target_os = "netbsd",
|
||||
target_os = "openbsd"
|
||||
))]
|
||||
let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
|
||||
|
||||
// Gives a warning for platforms without SOCK_NONBLOCK.
|
||||
#[allow(clippy::let_and_return)]
|
||||
let socket = syscall!(socket(domain, socket_type, 0));
|
||||
|
||||
// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
|
||||
// Solaris, couldn't find anything online.
|
||||
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
|
||||
let socket = socket.and_then(|socket| {
|
||||
// For platforms that don't support flags in socket, we need to
|
||||
// set the flags ourselves.
|
||||
syscall!(fcntl(
|
||||
socket,
|
||||
libc::F_SETFL,
|
||||
libc::O_NONBLOCK | libc::O_CLOEXEC
|
||||
))
|
||||
.map(|_| socket)
|
||||
});
|
||||
|
||||
socket
|
||||
}
|
||||
|
||||
pub fn socket_addr(addr: &SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
|
||||
match addr {
|
||||
SocketAddr::V4(ref addr) => (
|
||||
addr as *const _ as *const libc::sockaddr,
|
||||
size_of_val(addr) as libc::socklen_t,
|
||||
),
|
||||
SocketAddr::V6(ref addr) => (
|
||||
addr as *const _ as *const libc::sockaddr,
|
||||
size_of_val(addr) as libc::socklen_t,
|
||||
),
|
||||
}
|
||||
}
|
||||
@ -1,21 +1,22 @@
|
||||
use std::fmt;
|
||||
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
||||
use std::mem::size_of;
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
use crate::sys::unix::net::{new_socket, socket_addr};
|
||||
use crate::sys::unix::SourceFd;
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
use std::fmt;
|
||||
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
pub struct TcpStream {
|
||||
inner: net::TcpStream,
|
||||
}
|
||||
|
||||
impl TcpStream {
|
||||
pub(crate) fn new(inner: net::TcpStream) -> TcpStream {
|
||||
TcpStream { inner }
|
||||
}
|
||||
pub struct TcpListener {
|
||||
inner: net::TcpListener,
|
||||
}
|
||||
|
||||
impl TcpStream {
|
||||
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
|
||||
new_socket(addr, libc::SOCK_STREAM)
|
||||
.and_then(|socket| {
|
||||
@ -148,3 +149,103 @@ impl AsRawFd for TcpStream {
|
||||
self.inner.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl TcpListener {
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
|
||||
new_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
|
||||
// Set SO_REUSEADDR (mirrors what libstd does).
|
||||
syscall!(setsockopt(
|
||||
socket,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_REUSEADDR,
|
||||
&1 as *const libc::c_int as *const libc::c_void,
|
||||
size_of::<libc::c_int>() as libc::socklen_t,
|
||||
))
|
||||
.and_then(|_| {
|
||||
let (raw_addr, raw_addr_length) = socket_addr(&addr);
|
||||
syscall!(bind(socket, raw_addr, raw_addr_length))
|
||||
})
|
||||
.and_then(|_| syscall!(listen(socket, 1024)))
|
||||
.map_err(|err| {
|
||||
// Close the socket if we hit an error, ignoring the error
|
||||
// from closing since we can't pass back two errors.
|
||||
let _ = unsafe { libc::close(socket) };
|
||||
err
|
||||
})
|
||||
.map(|_| TcpListener {
|
||||
inner: unsafe { net::TcpListener::from_raw_fd(socket) },
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.local_addr()
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> io::Result<TcpListener> {
|
||||
self.inner.try_clone().map(|s| TcpListener { inner: s })
|
||||
}
|
||||
|
||||
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||
self.inner
|
||||
.accept()
|
||||
.map(|(inner, addr)| (TcpStream { inner }, addr))
|
||||
}
|
||||
|
||||
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
|
||||
self.inner.set_ttl(ttl)
|
||||
}
|
||||
|
||||
pub fn ttl(&self) -> io::Result<u32> {
|
||||
self.inner.ttl()
|
||||
}
|
||||
|
||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||
self.inner.take_error()
|
||||
}
|
||||
}
|
||||
|
||||
impl event::Source for TcpListener {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
|
||||
}
|
||||
|
||||
fn deregister(&self, registry: &Registry) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).deregister(registry)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.inner, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRawFd for TcpListener {
|
||||
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
|
||||
TcpListener {
|
||||
inner: net::TcpListener::from_raw_fd(fd),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoRawFd for TcpListener {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
self.inner.into_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for TcpListener {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.inner.as_raw_fd()
|
||||
}
|
||||
}
|
||||
@ -1,113 +0,0 @@
|
||||
use crate::sys::unix::net::{new_socket, socket_addr};
|
||||
use crate::sys::unix::{SourceFd, TcpStream};
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
|
||||
pub struct TcpListener {
|
||||
inner: net::TcpListener,
|
||||
}
|
||||
|
||||
impl TcpListener {
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
|
||||
new_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
|
||||
// Set SO_REUSEADDR (mirrors what libstd does).
|
||||
syscall!(setsockopt(
|
||||
socket,
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_REUSEADDR,
|
||||
&1 as *const libc::c_int as *const libc::c_void,
|
||||
size_of::<libc::c_int>() as libc::socklen_t,
|
||||
))
|
||||
.and_then(|_| {
|
||||
let (raw_addr, raw_addr_length) = socket_addr(&addr);
|
||||
syscall!(bind(socket, raw_addr, raw_addr_length))
|
||||
})
|
||||
.and_then(|_| syscall!(listen(socket, 1024)))
|
||||
.map_err(|err| {
|
||||
// Close the socket if we hit an error, ignoring the error
|
||||
// from closing since we can't pass back two errors.
|
||||
let _ = unsafe { libc::close(socket) };
|
||||
err
|
||||
})
|
||||
.map(|_| TcpListener {
|
||||
inner: unsafe { net::TcpListener::from_raw_fd(socket) },
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
||||
self.inner.local_addr()
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> io::Result<TcpListener> {
|
||||
self.inner.try_clone().map(|s| TcpListener { inner: s })
|
||||
}
|
||||
|
||||
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||
self.inner
|
||||
.accept()
|
||||
.map(|(inner, addr)| (TcpStream::new(inner), addr))
|
||||
}
|
||||
|
||||
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
|
||||
self.inner.set_ttl(ttl)
|
||||
}
|
||||
|
||||
pub fn ttl(&self) -> io::Result<u32> {
|
||||
self.inner.ttl()
|
||||
}
|
||||
|
||||
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
|
||||
self.inner.take_error()
|
||||
}
|
||||
}
|
||||
|
||||
impl event::Source for TcpListener {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
|
||||
}
|
||||
|
||||
fn reregister(
|
||||
&self,
|
||||
registry: &Registry,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
|
||||
}
|
||||
|
||||
fn deregister(&self, registry: &Registry) -> io::Result<()> {
|
||||
SourceFd(&self.as_raw_fd()).deregister(registry)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TcpListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.inner, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromRawFd for TcpListener {
|
||||
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
|
||||
TcpListener {
|
||||
inner: net::TcpListener::from_raw_fd(fd),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoRawFd for TcpListener {
|
||||
fn into_raw_fd(self) -> RawFd {
|
||||
self.inner.into_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for TcpListener {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.inner.as_raw_fd()
|
||||
}
|
||||
}
|
||||
@ -1,11 +1,11 @@
|
||||
use crate::sys::unix::net::{new_socket, socket_addr};
|
||||
use crate::unix::SourceFd;
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
use std::{fmt, io, net};
|
||||
|
||||
use crate::sys::unix::net::{new_socket, socket_addr};
|
||||
use crate::unix::SourceFd;
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
pub struct UdpSocket {
|
||||
io: net::UdpSocket,
|
||||
}
|
||||
|
||||
@ -1,12 +1,12 @@
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
mod eventfd {
|
||||
use crate::sys::Selector;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::os::unix::io::FromRawFd;
|
||||
|
||||
use crate::sys::Selector;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
/// Waker backed by `eventfd`.
|
||||
///
|
||||
/// `eventfd` is effectively an 64 bit counter. All writes must be of 8
|
||||
@ -63,11 +63,11 @@ pub use self::eventfd::Waker;
|
||||
|
||||
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
|
||||
mod kqueue {
|
||||
use std::io;
|
||||
|
||||
use crate::sys::Selector;
|
||||
use crate::Token;
|
||||
|
||||
use std::io;
|
||||
|
||||
/// Waker backed by kqueue user space notifications (`EVFILT_USER`).
|
||||
///
|
||||
/// The implementation is fairly simple, first the kqueue must be setup to
|
||||
@ -82,7 +82,7 @@ mod kqueue {
|
||||
|
||||
impl Waker {
|
||||
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
|
||||
selector.try_clone().and_then(|selector| {
|
||||
selector.try_clone_waker().and_then(|selector| {
|
||||
selector
|
||||
.setup_waker(token)
|
||||
.map(|()| Waker { selector, token })
|
||||
@ -106,13 +106,13 @@ pub use self::kqueue::Waker;
|
||||
target_os = "solaris"
|
||||
))]
|
||||
mod pipe {
|
||||
use crate::sys::unix::Selector;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::os::unix::io::FromRawFd;
|
||||
|
||||
use crate::sys::unix::Selector;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
/// Waker backed by a unix pipe.
|
||||
///
|
||||
/// Waker controls both the sending and receiving ends and empties the pipe
|
||||
|
||||
@ -1,18 +1,16 @@
|
||||
use lazy_static::lazy_static;
|
||||
use miow::iocp::CompletionPort;
|
||||
use ntapi::ntioapi::FILE_OPEN;
|
||||
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
|
||||
use ntapi::ntioapi::{NtCancelIoFileEx, NtCreateFile, NtDeviceIoControlFile};
|
||||
use ntapi::ntrtl::RtlNtStatusToDosError;
|
||||
use std::ffi::OsStr;
|
||||
use std::fmt;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::mem::{size_of, zeroed};
|
||||
use std::os::windows::ffi::OsStrExt;
|
||||
use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
|
||||
use std::ptr::null_mut;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use miow::iocp::CompletionPort;
|
||||
|
||||
use std::os::windows::ffi::OsStrExt;
|
||||
use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
|
||||
|
||||
use winapi::shared::ntdef::{
|
||||
HANDLE, LARGE_INTEGER, NTSTATUS, OBJECT_ATTRIBUTES, PVOID, ULONG, UNICODE_STRING,
|
||||
};
|
||||
@ -22,6 +20,11 @@ use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVEN
|
||||
use winapi::um::winnt::SYNCHRONIZE;
|
||||
use winapi::um::winnt::{FILE_SHARE_READ, FILE_SHARE_WRITE};
|
||||
|
||||
use ntapi::ntioapi::FILE_OPEN;
|
||||
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
|
||||
use ntapi::ntioapi::{NtCancelIoFileEx, NtCreateFile, NtDeviceIoControlFile};
|
||||
use ntapi::ntrtl::RtlNtStatusToDosError;
|
||||
|
||||
const IOCTL_AFD_POLL: ULONG = 0x00012024;
|
||||
|
||||
static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
|
||||
@ -42,6 +45,8 @@ struct ObjectAttributes(OBJECT_ATTRIBUTES);
|
||||
unsafe impl Send for ObjectAttributes {}
|
||||
unsafe impl Sync for ObjectAttributes {}
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
lazy_static! {
|
||||
static ref AFD_OBJ_NAME: UnicodeString = UnicodeString(UNICODE_STRING {
|
||||
// Lengths are calced in bytes
|
||||
@ -127,8 +132,8 @@ impl Afd {
|
||||
));
|
||||
}
|
||||
let fd = File::from_raw_handle(afd_helper_handle as RawHandle);
|
||||
let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
let afd = Afd { fd };
|
||||
let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
cp.add_handle(token, &afd.fd)?;
|
||||
match SetFileCompletionNotificationModes(
|
||||
afd_helper_handle,
|
||||
@ -218,5 +223,6 @@ pub const KNOWN_AFD_EVENTS: u32 = AFD_POLL_RECEIVE
|
||||
| AFD_POLL_SEND
|
||||
| AFD_POLL_DISCONNECT
|
||||
| AFD_POLL_ABORT
|
||||
| AFD_POLL_LOCAL_CLOSE
|
||||
| AFD_POLL_ACCEPT
|
||||
| AFD_POLL_CONNECT_FAIL;
|
||||
|
||||
@ -14,33 +14,34 @@ pub fn token(event: &Event) -> Token {
|
||||
}
|
||||
|
||||
pub fn is_readable(event: &Event) -> bool {
|
||||
if is_error(event) || is_read_hup(event) {
|
||||
if is_hup(event) {
|
||||
return true;
|
||||
}
|
||||
event.flags & (afd::AFD_POLL_RECEIVE | afd::AFD_POLL_ACCEPT) != 0
|
||||
(event.flags & (afd::KNOWN_AFD_EVENTS & !afd::AFD_POLL_SEND)) != 0
|
||||
}
|
||||
|
||||
pub fn is_writable(event: &Event) -> bool {
|
||||
if is_error(event) {
|
||||
if is_hup(event) {
|
||||
return true;
|
||||
}
|
||||
event.flags & afd::AFD_POLL_SEND != 0
|
||||
(event.flags & (afd::AFD_POLL_SEND)) != 0
|
||||
}
|
||||
|
||||
pub fn is_error(event: &Event) -> bool {
|
||||
event.flags & afd::AFD_POLL_CONNECT_FAIL != 0
|
||||
event.flags == afd::AFD_POLL_CONNECT_FAIL
|
||||
}
|
||||
|
||||
pub fn is_hup(event: &Event) -> bool {
|
||||
event.flags & afd::AFD_POLL_ABORT != 0
|
||||
(event.flags & (afd::AFD_POLL_ABORT | afd::AFD_POLL_CONNECT_FAIL)) != 0
|
||||
}
|
||||
|
||||
pub fn is_read_hup(event: &Event) -> bool {
|
||||
event.flags & afd::AFD_POLL_DISCONNECT != 0
|
||||
pub fn is_read_hup(_: &Event) -> bool {
|
||||
// Not supported.
|
||||
false
|
||||
}
|
||||
|
||||
pub fn is_priority(event: &Event) -> bool {
|
||||
event.flags & afd::AFD_POLL_RECEIVE_EXPEDITED != 0
|
||||
(event.flags & afd::AFD_POLL_RECEIVE_EXPEDITED) != 0
|
||||
}
|
||||
|
||||
pub fn is_aio(_: &Event) -> bool {
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::fmt;
|
||||
|
||||
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
|
||||
|
||||
pub struct IoStatusBlock(UnsafeCell<IO_STATUS_BLOCK>);
|
||||
|
||||
// There is a pointer field in `IO_STATUS_BLOCK_u`, which we don't use that. Thus it is safe to implement Send here.
|
||||
|
||||
@ -1,12 +1,4 @@
|
||||
use std::io;
|
||||
use std::mem::size_of_val;
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
|
||||
use std::sync::{Arc, Mutex, Once};
|
||||
use winapi::ctypes::c_int;
|
||||
use winapi::shared::ws2def::SOCKADDR;
|
||||
use winapi::um::winsock2::{
|
||||
ioctlsocket, socket, FIONBIO, INVALID_SOCKET, PF_INET, PF_INET6, SOCKET,
|
||||
};
|
||||
|
||||
/// Helper macro to execute a system call that returns an `io::Result`.
|
||||
//
|
||||
@ -41,35 +33,6 @@ pub trait SocketState {
|
||||
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>);
|
||||
}
|
||||
|
||||
use crate::{Interests, Token};
|
||||
|
||||
struct InternalState {
|
||||
selector: Arc<SelectorInner>,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
sock_state: Option<Arc<Mutex<SockState>>>,
|
||||
}
|
||||
|
||||
impl InternalState {
|
||||
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
|
||||
InternalState {
|
||||
selector,
|
||||
token,
|
||||
interests,
|
||||
sock_state: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for InternalState {
|
||||
fn drop(&mut self) {
|
||||
if let Some(sock_state) = self.sock_state.as_ref() {
|
||||
let mut sock_state = sock_state.lock().unwrap();
|
||||
sock_state.mark_delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialise the network stack for Windows.
|
||||
fn init() {
|
||||
static INIT: Once = Once::new();
|
||||
@ -80,48 +43,3 @@ fn init() {
|
||||
drop(std::net::UdpSocket::bind("127.0.0.1:0"));
|
||||
});
|
||||
}
|
||||
|
||||
/// Create a new non-blocking socket.
|
||||
fn new_socket(addr: SocketAddr, socket_type: c_int) -> io::Result<SOCKET> {
|
||||
let domain = match addr {
|
||||
SocketAddr::V4(..) => PF_INET,
|
||||
SocketAddr::V6(..) => PF_INET6,
|
||||
};
|
||||
|
||||
syscall!(
|
||||
socket(domain, socket_type, 0),
|
||||
PartialEq::eq,
|
||||
INVALID_SOCKET
|
||||
)
|
||||
.and_then(|socket| {
|
||||
syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
|
||||
})
|
||||
}
|
||||
|
||||
fn socket_addr(addr: &SocketAddr) -> (*const SOCKADDR, c_int) {
|
||||
match addr {
|
||||
SocketAddr::V4(ref addr) => (
|
||||
addr as *const _ as *const SOCKADDR,
|
||||
size_of_val(addr) as c_int,
|
||||
),
|
||||
SocketAddr::V6(ref addr) => (
|
||||
addr as *const _ as *const SOCKADDR,
|
||||
size_of_val(addr) as c_int,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn inaddr_any(other: SocketAddr) -> SocketAddr {
|
||||
match other {
|
||||
SocketAddr::V4(..) => {
|
||||
let any = Ipv4Addr::new(0, 0, 0, 0);
|
||||
let addr = SocketAddrV4::new(any, 0);
|
||||
SocketAddr::V4(addr)
|
||||
}
|
||||
SocketAddr::V6(..) => {
|
||||
let any = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
|
||||
let addr = SocketAddrV6::new(any, 0, 0, 0);
|
||||
SocketAddr::V6(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,78 +1,38 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::null_mut;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use std::os::windows::io::{AsRawSocket, RawSocket};
|
||||
|
||||
use winapi::shared::ntdef::NT_SUCCESS;
|
||||
use winapi::shared::ntdef::{HANDLE, PVOID};
|
||||
use winapi::shared::ntstatus::STATUS_CANCELLED;
|
||||
use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING};
|
||||
use winapi::um::mswsock::SIO_BASE_HANDLE;
|
||||
use winapi::um::winsock2::{WSAIoctl, INVALID_SOCKET, SOCKET_ERROR};
|
||||
|
||||
use miow::iocp::{CompletionPort, CompletionStatus};
|
||||
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use super::afd::{Afd, AfdPollInfo};
|
||||
use super::afd::{
|
||||
AFD_POLL_ABORT, AFD_POLL_ACCEPT, AFD_POLL_CONNECT_FAIL, AFD_POLL_DISCONNECT,
|
||||
AFD_POLL_LOCAL_CLOSE, AFD_POLL_RECEIVE, AFD_POLL_SEND, KNOWN_AFD_EVENTS,
|
||||
AFD_POLL_LOCAL_CLOSE, AFD_POLL_RECEIVE, AFD_POLL_RECEIVE_EXPEDITED, AFD_POLL_SEND,
|
||||
KNOWN_AFD_EVENTS,
|
||||
};
|
||||
use super::io_status_block::IoStatusBlock;
|
||||
use super::Event;
|
||||
use super::SocketState;
|
||||
use crate::sys::Events;
|
||||
use crate::{Interests, Token};
|
||||
|
||||
use miow::iocp::{CompletionPort, CompletionStatus};
|
||||
use std::cell::UnsafeCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
use std::os::windows::io::{AsRawSocket, RawSocket};
|
||||
use std::pin::Pin;
|
||||
use std::ptr::null_mut;
|
||||
#[cfg(debug_assertions)]
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
use winapi::shared::ntdef::NT_SUCCESS;
|
||||
use winapi::shared::ntdef::{HANDLE, PVOID};
|
||||
use winapi::shared::ntstatus::STATUS_CANCELLED;
|
||||
use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
|
||||
use winapi::um::mswsock::SIO_BASE_HANDLE;
|
||||
use winapi::um::winsock2::{WSAIoctl, INVALID_SOCKET, SOCKET_ERROR};
|
||||
|
||||
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AfdGroup {
|
||||
cp: Arc<CompletionPort>,
|
||||
afd_group: Mutex<Vec<Arc<Afd>>>,
|
||||
}
|
||||
|
||||
impl AfdGroup {
|
||||
pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
|
||||
AfdGroup {
|
||||
afd_group: Mutex::new(Vec::new()),
|
||||
cp,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn acquire(&self) -> io::Result<Arc<Afd>> {
|
||||
let mut afd_group = self.afd_group.lock().unwrap();
|
||||
if afd_group.len() == 0 {
|
||||
self._alloc_afd_group(&mut afd_group)?;
|
||||
} else {
|
||||
// + 1 reference in Vec
|
||||
if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
|
||||
self._alloc_afd_group(&mut afd_group)?;
|
||||
}
|
||||
}
|
||||
match afd_group.last() {
|
||||
Some(arc) => Ok(arc.clone()),
|
||||
None => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn release_unused_afd(&self) {
|
||||
let mut afd_group = self.afd_group.lock().unwrap();
|
||||
afd_group.retain(|g| Arc::strong_count(&g) > 1);
|
||||
}
|
||||
|
||||
fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
|
||||
let afd = Afd::new(&self.cp)?;
|
||||
let arc = Arc::new(afd);
|
||||
afd_group.push(arc);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the deallocation wrapper for overlapped pointer.
|
||||
/// In case of error or status changing before the overlapped pointer is actually used(or not even being used),
|
||||
/// this wrapper will decrease the reference count of Arc if being dropped.
|
||||
@ -175,10 +135,12 @@ impl SockState {
|
||||
* poll operation; when we receive it's completion package, a new poll
|
||||
* operation will be submitted with the correct event mask. */
|
||||
self.cancel()?;
|
||||
return Ok(());
|
||||
}
|
||||
} else if let SockPollStatus::Cancelled = self.poll_status {
|
||||
/* The poll operation has already been cancelled, we're still waiting for
|
||||
* it to return. For now, there's nothing that needs to be done. */
|
||||
return Ok(());
|
||||
} else if let SockPollStatus::Idle = self.poll_status {
|
||||
/* No poll operation is pending; start one. */
|
||||
self.poll_info.exclusive = 0;
|
||||
@ -188,7 +150,7 @@ impl SockState {
|
||||
}
|
||||
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
|
||||
self.poll_info.handles[0].status = 0;
|
||||
self.poll_info.handles[0].events = self.user_evts | AFD_POLL_LOCAL_CLOSE;
|
||||
self.poll_info.handles[0].events = self.user_evts;
|
||||
|
||||
let wrapped_overlapped = OverlappedArcWrapper::new(self_arc);
|
||||
let overlapped = wrapped_overlapped.get_ptr() as *const _ as PVOID;
|
||||
@ -197,23 +159,24 @@ impl SockState {
|
||||
.poll(&mut self.poll_info, (*self.iosb).as_mut_ptr(), overlapped)
|
||||
};
|
||||
if let Err(e) = result {
|
||||
let code = e.raw_os_error().unwrap();
|
||||
if code == ERROR_IO_PENDING as i32 {
|
||||
/* Overlapped poll operation in progress; this is expected. */
|
||||
} else if code == ERROR_INVALID_HANDLE as i32 {
|
||||
/* Socket closed; it'll be dropped. */
|
||||
self.mark_delete();
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(e);
|
||||
if let Some(code) = e.raw_os_error() {
|
||||
if code == ERROR_IO_PENDING as i32 {
|
||||
/* Overlapped poll operation in progress; this is expected. */
|
||||
} else if code == ERROR_INVALID_HANDLE as i32 {
|
||||
/* Socket closed; it'll be dropped. */
|
||||
self.mark_delete();
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.poll_status = SockPollStatus::Pending;
|
||||
if self.self_wrapped.is_some() {
|
||||
// This shouldn't be happening. We cannot deallocate already pending overlapped before feed_event so we need to stand out here to declare unreachable.
|
||||
unreachable!();
|
||||
}
|
||||
self.poll_status = SockPollStatus::Pending;
|
||||
self.self_wrapped = Some(wrapped_overlapped);
|
||||
self.pending_evts = self.user_evts;
|
||||
} else {
|
||||
@ -235,6 +198,16 @@ impl SockState {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_delete(&mut self) {
|
||||
if !self.delete_pending {
|
||||
if let SockPollStatus::Pending = self.poll_status {
|
||||
drop(self.cancel());
|
||||
}
|
||||
|
||||
self.delete_pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
// This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
|
||||
fn feed_event(&mut self) -> Option<Event> {
|
||||
if self.self_wrapped.is_some() {
|
||||
@ -260,7 +233,6 @@ impl SockState {
|
||||
} else if self.poll_info.number_of_handles < 1 {
|
||||
/* This poll operation succeeded but didn't report any socket events. */
|
||||
} else if self.poll_info.handles[0].events & AFD_POLL_LOCAL_CLOSE != 0 {
|
||||
/* The poll operation reported that the socket was closed. */
|
||||
self.mark_delete();
|
||||
return None;
|
||||
} else {
|
||||
@ -279,12 +251,12 @@ impl SockState {
|
||||
// then reregister the socket to reset the interests.
|
||||
|
||||
// Reset readable event
|
||||
if (afd_events & interests_to_afd_flags(Interests::READABLE)) != 0 {
|
||||
self.user_evts &= !(interests_to_afd_flags(Interests::READABLE));
|
||||
if (afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)) != 0 {
|
||||
self.user_evts &= !(afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND));
|
||||
}
|
||||
// Reset writable event
|
||||
if (afd_events & interests_to_afd_flags(Interests::WRITABLE)) != 0 {
|
||||
self.user_evts &= !interests_to_afd_flags(Interests::WRITABLE);
|
||||
if (afd_events & AFD_POLL_SEND) != 0 {
|
||||
self.user_evts &= !AFD_POLL_SEND;
|
||||
}
|
||||
|
||||
Some(Event {
|
||||
@ -293,25 +265,9 @@ impl SockState {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn is_pending_deletion(&self) -> bool {
|
||||
fn is_pending_deletion(&self) -> bool {
|
||||
self.delete_pending
|
||||
}
|
||||
|
||||
pub fn mark_delete(&mut self) {
|
||||
if !self.delete_pending {
|
||||
if let SockPollStatus::Pending = self.poll_status {
|
||||
drop(self.cancel());
|
||||
}
|
||||
|
||||
self.delete_pending = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SockState {
|
||||
fn drop(&mut self) {
|
||||
self.mark_delete();
|
||||
}
|
||||
}
|
||||
|
||||
/// Each Selector has a globally unique(ish) ID associated with it. This ID
|
||||
@ -395,106 +351,43 @@ impl Selector {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SelectorInner {
|
||||
lock: Mutex<()>,
|
||||
cp: Arc<CompletionPort>,
|
||||
active_poll_count: UnsafeCell<usize>,
|
||||
update_queue: UnsafeCell<VecDeque<Arc<Mutex<SockState>>>>,
|
||||
afd_group: AfdGroup,
|
||||
cp: CompletionPort,
|
||||
active_poll_count: AtomicUsize,
|
||||
update_queue: Mutex<VecDeque<Arc<Mutex<SockState>>>>,
|
||||
afd_group: Mutex<Vec<Arc<Afd>>>,
|
||||
}
|
||||
|
||||
// We have ensured thread safety by introducing lock manually.
|
||||
unsafe impl Sync for SelectorInner {}
|
||||
|
||||
impl SelectorInner {
|
||||
pub fn new() -> io::Result<SelectorInner> {
|
||||
CompletionPort::new(0).map(|cp| {
|
||||
let cp = Arc::new(cp);
|
||||
let cp_afd = Arc::clone(&cp);
|
||||
|
||||
SelectorInner {
|
||||
lock: Mutex::new(()),
|
||||
cp: cp,
|
||||
active_poll_count: UnsafeCell::new(0),
|
||||
update_queue: UnsafeCell::new(VecDeque::new()),
|
||||
afd_group: AfdGroup::new(cp_afd),
|
||||
}
|
||||
CompletionPort::new(0).map(|cp| SelectorInner {
|
||||
cp: cp,
|
||||
active_poll_count: AtomicUsize::new(0),
|
||||
update_queue: Mutex::new(VecDeque::new()),
|
||||
afd_group: Mutex::new(Vec::new()),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
|
||||
events.clear();
|
||||
|
||||
let mut n = 0;
|
||||
let start = Instant::now();
|
||||
self.update_sockets_events()?;
|
||||
|
||||
loop {
|
||||
if timeout.is_none() {
|
||||
let len = self.select2(&mut events.statuses, &mut events.events, None)?;
|
||||
if len == 0 {
|
||||
continue;
|
||||
}
|
||||
self.active_poll_count.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let result = self.cp.get_many(&mut events.statuses, timeout);
|
||||
|
||||
self.active_poll_count.fetch_sub(1, Ordering::SeqCst);
|
||||
|
||||
if let Err(e) = result {
|
||||
use winapi::shared::winerror::WAIT_TIMEOUT;
|
||||
if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) {
|
||||
return Ok(());
|
||||
} else {
|
||||
if n >= events.statuses.len() {
|
||||
return Ok(());
|
||||
}
|
||||
let timeout = timeout.unwrap();
|
||||
let deadline = start + timeout;
|
||||
let now = Instant::now();
|
||||
if timeout.as_nanos() != 0 {
|
||||
if now >= deadline {
|
||||
return Ok(());
|
||||
}
|
||||
let len = self.select2(
|
||||
&mut events.statuses[n..],
|
||||
&mut events.events,
|
||||
Some(deadline - now),
|
||||
)?;
|
||||
if len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
n += len;
|
||||
} else {
|
||||
self.select2(&mut events.statuses[n..], &mut events.events, Some(timeout))?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn select2(
|
||||
&self,
|
||||
statuses: &mut [CompletionStatus],
|
||||
events: &mut Vec<Event>,
|
||||
timeout: Option<Duration>,
|
||||
) -> io::Result<usize> {
|
||||
{
|
||||
let _guard = self.lock.lock().unwrap();
|
||||
|
||||
unsafe {
|
||||
self.update_sockets_events()?;
|
||||
|
||||
*self.active_poll_count.get() += 1;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let result = self.cp.get_many(statuses, timeout);
|
||||
|
||||
{
|
||||
let _guard = self.lock.lock().unwrap();
|
||||
|
||||
unsafe {
|
||||
*self.active_poll_count.get() -= 1;
|
||||
}
|
||||
|
||||
if let Err(e) = result {
|
||||
if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) {
|
||||
return Ok(0);
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
unsafe { Ok(self.feed_events(events, result.unwrap())) }
|
||||
}
|
||||
self.feed_events(&mut events.events, result.unwrap());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn register<S: SocketState + AsRawSocket>(
|
||||
@ -519,13 +412,8 @@ impl SelectorInner {
|
||||
sock.lock().unwrap().set_event(event);
|
||||
}
|
||||
socket.set_sock_state(Some(sock));
|
||||
{
|
||||
let _guard = self.lock.lock().unwrap();
|
||||
unsafe {
|
||||
self.add_socket_to_update_queue(socket);
|
||||
self.update_sockets_events_if_polling()?;
|
||||
}
|
||||
}
|
||||
self.add_socket_to_update_queue(socket);
|
||||
self.update_sockets_events_if_polling()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -550,13 +438,8 @@ impl SelectorInner {
|
||||
{
|
||||
sock.lock().unwrap().set_event(event);
|
||||
}
|
||||
{
|
||||
let _guard = self.lock.lock().unwrap();
|
||||
unsafe {
|
||||
self.add_socket_to_update_queue(socket);
|
||||
self.update_sockets_events_if_polling()?;
|
||||
}
|
||||
}
|
||||
self.add_socket_to_update_queue(socket);
|
||||
self.update_sockets_events_if_polling()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -566,7 +449,7 @@ impl SelectorInner {
|
||||
return Err(io::Error::from(io::ErrorKind::NotFound));
|
||||
}
|
||||
socket.set_sock_state(None);
|
||||
self.afd_group.release_unused_afd();
|
||||
self._release_unused_afd();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -574,76 +457,112 @@ impl SelectorInner {
|
||||
&self.cp
|
||||
}
|
||||
|
||||
unsafe fn update_sockets_events(&self) -> io::Result<()> {
|
||||
let update_queue = &mut *self.update_queue.get();
|
||||
loop {
|
||||
let sock = match update_queue.pop_front() {
|
||||
Some(sock) => sock,
|
||||
None => break,
|
||||
};
|
||||
let mut sock_internal = sock.lock().unwrap();
|
||||
if !sock_internal.is_pending_deletion() {
|
||||
sock_internal.update(&sock).unwrap();
|
||||
pub fn mark_delete_socket(&self, sock_state: &mut SockState) {
|
||||
sock_state.mark_delete();
|
||||
}
|
||||
|
||||
fn update_sockets_events(&self) -> io::Result<()> {
|
||||
{
|
||||
let mut update_queue = self.update_queue.lock().unwrap();
|
||||
loop {
|
||||
let sock = match update_queue.pop_front() {
|
||||
Some(sock) => sock,
|
||||
None => break,
|
||||
};
|
||||
let mut sock_internal = sock.lock().unwrap();
|
||||
if !sock_internal.is_pending_deletion() {
|
||||
sock_internal.update(&sock).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
self.afd_group.release_unused_afd();
|
||||
self._release_unused_afd();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
|
||||
let active_poll_count = *self.active_poll_count.get();
|
||||
if active_poll_count > 0 {
|
||||
fn update_sockets_events_if_polling(&self) -> io::Result<()> {
|
||||
if self.active_poll_count.load(Ordering::SeqCst) > 0 {
|
||||
return self.update_sockets_events();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
unsafe fn add_socket_to_update_queue<S: SocketState>(&self, socket: &S) {
|
||||
let sock_state = socket.get_sock_state().unwrap();
|
||||
let update_queue = &mut *self.update_queue.get();
|
||||
update_queue.push_back(sock_state);
|
||||
}
|
||||
|
||||
// It returns processed count of iocp_events rather than the events itself.
|
||||
unsafe fn feed_events(
|
||||
&self,
|
||||
events: &mut Vec<Event>,
|
||||
iocp_events: &[CompletionStatus],
|
||||
) -> usize {
|
||||
let mut n = 0;
|
||||
let update_queue = &mut *self.update_queue.get();
|
||||
for iocp_event in iocp_events.iter() {
|
||||
if iocp_event.overlapped() as usize == 0 {
|
||||
events.push(Event {
|
||||
flags: AFD_POLL_RECEIVE,
|
||||
data: iocp_event.token() as u64,
|
||||
});
|
||||
n += 1;
|
||||
continue;
|
||||
}
|
||||
let sock_arc = Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>);
|
||||
let mut sock_guard = sock_arc.lock().unwrap();
|
||||
match sock_guard.feed_event() {
|
||||
Some(e) => {
|
||||
events.push(e);
|
||||
fn feed_events(&self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus]) {
|
||||
{
|
||||
let mut update_queue = self.update_queue.lock().unwrap();
|
||||
for iocp_event in iocp_events.iter() {
|
||||
if iocp_event.overlapped() as usize == 0 {
|
||||
events.push(Event {
|
||||
flags: AFD_POLL_RECEIVE,
|
||||
data: iocp_event.token() as u64,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
let sock_arc =
|
||||
unsafe { Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>) };
|
||||
let mut sock_guard = sock_arc.lock().unwrap();
|
||||
match sock_guard.feed_event() {
|
||||
Some(e) => {
|
||||
events.push(e);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
if !sock_guard.is_pending_deletion() {
|
||||
update_queue.push_back(sock_arc.clone());
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
n += 1;
|
||||
if !sock_guard.is_pending_deletion() {
|
||||
update_queue.push_back(sock_arc.clone());
|
||||
}
|
||||
}
|
||||
self.afd_group.release_unused_afd();
|
||||
n
|
||||
self._release_unused_afd();
|
||||
}
|
||||
|
||||
fn _acquire_afd(&self) -> io::Result<Arc<Afd>> {
|
||||
let mut need_alloc = false;
|
||||
{
|
||||
let afd_group = self.afd_group.lock().unwrap();
|
||||
if afd_group.len() == 0 {
|
||||
need_alloc = true;
|
||||
} else {
|
||||
// + 1 reference in Vec
|
||||
if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
|
||||
need_alloc = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if need_alloc {
|
||||
self._alloc_afd_group()?;
|
||||
}
|
||||
match self.afd_group.lock().unwrap().last() {
|
||||
Some(rc) => Ok(rc.clone()),
|
||||
None => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn _release_unused_afd(&self) {
|
||||
let mut afd_group = self.afd_group.lock().unwrap();
|
||||
afd_group.retain(|g| Arc::strong_count(&g) > 1);
|
||||
}
|
||||
|
||||
fn _alloc_afd_group(&self) -> io::Result<()> {
|
||||
let mut afd_group = self.afd_group.lock().unwrap();
|
||||
let afd = Afd::new(&self.cp)?;
|
||||
let rc = Arc::new(afd);
|
||||
afd_group.push(rc);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn _alloc_sock_for_rawsocket(
|
||||
&self,
|
||||
raw_socket: RawSocket,
|
||||
) -> io::Result<Arc<Mutex<SockState>>> {
|
||||
let afd = self.afd_group.acquire()?;
|
||||
Ok(Arc::new(Mutex::new(SockState::new(raw_socket, afd)?)))
|
||||
Ok(Arc::new(Mutex::new(SockState::new(
|
||||
raw_socket,
|
||||
self._acquire_afd()?,
|
||||
)?)))
|
||||
}
|
||||
|
||||
fn add_socket_to_update_queue<S: SocketState>(&self, socket: &S) {
|
||||
let sock_state = socket.get_sock_state().unwrap();
|
||||
let mut update_queue = self.update_queue.lock().unwrap();
|
||||
update_queue.push_back(sock_state);
|
||||
}
|
||||
}
|
||||
|
||||
@ -651,8 +570,8 @@ fn interests_to_afd_flags(interests: Interests) -> u32 {
|
||||
let mut flags = 0;
|
||||
|
||||
if interests.is_readable() {
|
||||
// AFD_POLL_DISCONNECT for is_read_hup()
|
||||
flags |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT | AFD_POLL_DISCONNECT;
|
||||
flags |=
|
||||
AFD_POLL_RECEIVE | AFD_POLL_RECEIVE_EXPEDITED | AFD_POLL_ACCEPT | AFD_POLL_DISCONNECT;
|
||||
}
|
||||
|
||||
if interests.is_writable() {
|
||||
|
||||
@ -1,25 +1,50 @@
|
||||
use super::selector::SockState;
|
||||
use super::InternalState;
|
||||
use super::{inaddr_any, new_socket, socket_addr};
|
||||
use std::cmp::PartialEq;
|
||||
use std::fmt;
|
||||
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
||||
use std::mem::size_of_val;
|
||||
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
|
||||
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
|
||||
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use winapi::ctypes::c_int;
|
||||
use winapi::shared::ws2def::SOCKADDR;
|
||||
use winapi::um::winsock2::{
|
||||
bind, closesocket, connect, ioctlsocket, listen, socket, FIONBIO, INVALID_SOCKET, PF_INET,
|
||||
PF_INET6, SOCKET, SOCKET_ERROR, SOCK_STREAM,
|
||||
};
|
||||
|
||||
use crate::poll;
|
||||
use crate::sys::windows::init;
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
use std::fmt;
|
||||
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
|
||||
use std::net::{self, SocketAddr};
|
||||
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
|
||||
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
|
||||
use std::sync::{Arc, Mutex};
|
||||
use winapi::um::winsock2::{bind, closesocket, connect, listen, SOCKET_ERROR, SOCK_STREAM};
|
||||
use super::selector::{SelectorInner, SockState};
|
||||
|
||||
struct InternalState {
|
||||
selector: Arc<SelectorInner>,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
sock_state: Option<Arc<Mutex<SockState>>>,
|
||||
}
|
||||
|
||||
impl InternalState {
|
||||
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
|
||||
InternalState {
|
||||
selector,
|
||||
token,
|
||||
interests,
|
||||
sock_state: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpStream {
|
||||
internal: Arc<Mutex<Option<InternalState>>>,
|
||||
internal: Arc<RwLock<Option<InternalState>>>,
|
||||
inner: net::TcpStream,
|
||||
}
|
||||
|
||||
pub struct TcpListener {
|
||||
internal: Arc<Mutex<Option<InternalState>>>,
|
||||
internal: Arc<RwLock<Option<InternalState>>>,
|
||||
inner: net::TcpListener,
|
||||
}
|
||||
|
||||
@ -28,16 +53,12 @@ macro_rules! wouldblock {
|
||||
let result = (&$self.inner).$method();
|
||||
if let Err(ref e) = result {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
let internal = $self.internal.lock().unwrap();
|
||||
if internal.is_some() {
|
||||
let selector = internal.as_ref().unwrap().selector.clone();
|
||||
let token = internal.as_ref().unwrap().token;
|
||||
let interests = internal.as_ref().unwrap().interests;
|
||||
drop(internal);
|
||||
selector.reregister(
|
||||
let internal = $self.internal.read().unwrap();
|
||||
if let Some(internal) = &*internal {
|
||||
internal.selector.reregister(
|
||||
$self,
|
||||
token,
|
||||
interests,
|
||||
internal.token,
|
||||
internal.interests,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
@ -48,16 +69,12 @@ macro_rules! wouldblock {
|
||||
let result = (&$self.inner).$method($($args),*);
|
||||
if let Err(ref e) = result {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
let internal = $self.internal.lock().unwrap();
|
||||
if internal.is_some() {
|
||||
let selector = internal.as_ref().unwrap().selector.clone();
|
||||
let token = internal.as_ref().unwrap().token;
|
||||
let interests = internal.as_ref().unwrap().interests;
|
||||
drop(internal);
|
||||
selector.reregister(
|
||||
let internal = $self.internal.read().unwrap();
|
||||
if let Some(internal) = &*internal {
|
||||
internal.selector.reregister(
|
||||
$self,
|
||||
token,
|
||||
interests,
|
||||
internal.token,
|
||||
internal.interests,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
@ -69,7 +86,7 @@ macro_rules! wouldblock {
|
||||
impl TcpStream {
|
||||
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
|
||||
init();
|
||||
new_socket(addr, SOCK_STREAM)
|
||||
new_socket(addr)
|
||||
.and_then(|socket| {
|
||||
// Required for a future `connect_overlapped` operation to be
|
||||
// executed successfully.
|
||||
@ -101,7 +118,7 @@ impl TcpStream {
|
||||
})
|
||||
})
|
||||
.map(|socket| TcpStream {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: unsafe { net::TcpStream::from_raw_socket(socket as StdSocket) },
|
||||
})
|
||||
}
|
||||
@ -116,7 +133,7 @@ impl TcpStream {
|
||||
|
||||
pub fn try_clone(&self) -> io::Result<TcpStream> {
|
||||
self.inner.try_clone().map(|s| TcpStream {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: s,
|
||||
})
|
||||
}
|
||||
@ -150,9 +167,24 @@ impl TcpStream {
|
||||
}
|
||||
}
|
||||
|
||||
fn inaddr_any(other: SocketAddr) -> SocketAddr {
|
||||
match other {
|
||||
SocketAddr::V4(..) => {
|
||||
let any = Ipv4Addr::new(0, 0, 0, 0);
|
||||
let addr = SocketAddrV4::new(any, 0);
|
||||
SocketAddr::V4(addr)
|
||||
}
|
||||
SocketAddr::V6(..) => {
|
||||
let any = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
|
||||
let addr = SocketAddrV6::new(any, 0, 0, 0);
|
||||
SocketAddr::V6(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl super::SocketState for TcpStream {
|
||||
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
|
||||
let internal = self.internal.lock().unwrap();
|
||||
let internal = self.internal.read().unwrap();
|
||||
match &*internal {
|
||||
Some(internal) => match &internal.sock_state {
|
||||
Some(arc) => Some(arc.clone()),
|
||||
@ -162,7 +194,7 @@ impl super::SocketState for TcpStream {
|
||||
}
|
||||
}
|
||||
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
match &mut *internal {
|
||||
Some(internal) => {
|
||||
internal.sock_state = sock_state;
|
||||
@ -174,7 +206,7 @@ impl super::SocketState for TcpStream {
|
||||
|
||||
impl<'a> super::SocketState for &'a TcpStream {
|
||||
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
|
||||
let internal = self.internal.lock().unwrap();
|
||||
let internal = self.internal.read().unwrap();
|
||||
match &*internal {
|
||||
Some(internal) => match &internal.sock_state {
|
||||
Some(arc) => Some(arc.clone()),
|
||||
@ -184,7 +216,7 @@ impl<'a> super::SocketState for &'a TcpStream {
|
||||
}
|
||||
}
|
||||
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
match &mut *internal {
|
||||
Some(internal) => {
|
||||
internal.sock_state = sock_state;
|
||||
@ -194,6 +226,19 @@ impl<'a> super::SocketState for &'a TcpStream {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TcpStream {
|
||||
fn drop(&mut self) {
|
||||
let internal = self.internal.read().unwrap();
|
||||
if let Some(internal) = internal.as_ref() {
|
||||
if let Some(sock_state) = internal.sock_state.as_ref() {
|
||||
internal
|
||||
.selector
|
||||
.mark_delete_socket(&mut sock_state.lock().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for TcpStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
wouldblock!(self, read, buf)
|
||||
@ -245,7 +290,7 @@ impl<'a> Write for &'a TcpStream {
|
||||
impl event::Source for TcpStream {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
{
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
if internal.is_none() {
|
||||
*internal = Some(InternalState::new(
|
||||
poll::selector(registry).clone_inner(),
|
||||
@ -258,7 +303,7 @@ impl event::Source for TcpStream {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
}
|
||||
@ -274,7 +319,7 @@ impl event::Source for TcpStream {
|
||||
let result = poll::selector(registry).reregister(self, token, interests);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
internal.as_mut().unwrap().token = token;
|
||||
internal.as_mut().unwrap().interests = interests;
|
||||
}
|
||||
@ -287,7 +332,7 @@ impl event::Source for TcpStream {
|
||||
let result = poll::selector(registry).deregister(self);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
Err(_) => {}
|
||||
@ -305,7 +350,7 @@ impl fmt::Debug for TcpStream {
|
||||
impl FromRawSocket for TcpStream {
|
||||
unsafe fn from_raw_socket(rawsocket: RawSocket) -> TcpStream {
|
||||
TcpStream {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: net::TcpStream::from_raw_socket(rawsocket),
|
||||
}
|
||||
}
|
||||
@ -326,7 +371,7 @@ impl AsRawSocket for TcpStream {
|
||||
impl TcpListener {
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
|
||||
init();
|
||||
new_socket(addr, SOCK_STREAM).and_then(|socket| {
|
||||
new_socket(addr).and_then(|socket| {
|
||||
let (raw_addr, raw_addr_length) = socket_addr(&addr);
|
||||
syscall!(
|
||||
bind(socket, raw_addr, raw_addr_length,),
|
||||
@ -341,7 +386,7 @@ impl TcpListener {
|
||||
err
|
||||
})
|
||||
.map(|_| TcpListener {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: unsafe { net::TcpListener::from_raw_socket(socket as StdSocket) },
|
||||
})
|
||||
})
|
||||
@ -353,7 +398,7 @@ impl TcpListener {
|
||||
|
||||
pub fn try_clone(&self) -> io::Result<TcpListener> {
|
||||
self.inner.try_clone().map(|s| TcpListener {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: s,
|
||||
})
|
||||
}
|
||||
@ -362,7 +407,7 @@ impl TcpListener {
|
||||
wouldblock!(self, accept).map(|(inner, addr)| {
|
||||
(
|
||||
TcpStream {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner,
|
||||
},
|
||||
addr,
|
||||
@ -383,9 +428,22 @@ impl TcpListener {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TcpListener {
|
||||
fn drop(&mut self) {
|
||||
let internal = self.internal.read().unwrap();
|
||||
if let Some(internal) = internal.as_ref() {
|
||||
if let Some(sock_state) = internal.sock_state.as_ref() {
|
||||
internal
|
||||
.selector
|
||||
.mark_delete_socket(&mut sock_state.lock().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl super::SocketState for TcpListener {
|
||||
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
|
||||
let internal = self.internal.lock().unwrap();
|
||||
let internal = self.internal.read().unwrap();
|
||||
match &*internal {
|
||||
Some(internal) => match &internal.sock_state {
|
||||
Some(arc) => Some(arc.clone()),
|
||||
@ -395,7 +453,7 @@ impl super::SocketState for TcpListener {
|
||||
}
|
||||
}
|
||||
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
match &mut *internal {
|
||||
Some(internal) => {
|
||||
internal.sock_state = sock_state;
|
||||
@ -408,7 +466,7 @@ impl super::SocketState for TcpListener {
|
||||
impl event::Source for TcpListener {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
{
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
if internal.is_none() {
|
||||
*internal = Some(InternalState::new(
|
||||
poll::selector(registry).clone_inner(),
|
||||
@ -421,7 +479,7 @@ impl event::Source for TcpListener {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
}
|
||||
@ -437,7 +495,7 @@ impl event::Source for TcpListener {
|
||||
let result = poll::selector(registry).reregister(self, token, interests);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
internal.as_mut().unwrap().token = token;
|
||||
internal.as_mut().unwrap().interests = interests;
|
||||
}
|
||||
@ -450,7 +508,7 @@ impl event::Source for TcpListener {
|
||||
let result = poll::selector(registry).deregister(self);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
Err(_) => {}
|
||||
@ -468,7 +526,7 @@ impl fmt::Debug for TcpListener {
|
||||
impl FromRawSocket for TcpListener {
|
||||
unsafe fn from_raw_socket(rawsocket: RawSocket) -> TcpListener {
|
||||
TcpListener {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
inner: net::TcpListener::from_raw_socket(rawsocket),
|
||||
}
|
||||
}
|
||||
@ -485,3 +543,33 @@ impl AsRawSocket for TcpListener {
|
||||
self.inner.as_raw_socket()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new non-blocking socket.
|
||||
fn new_socket(addr: SocketAddr) -> io::Result<SOCKET> {
|
||||
let domain = match addr {
|
||||
SocketAddr::V4(..) => PF_INET,
|
||||
SocketAddr::V6(..) => PF_INET6,
|
||||
};
|
||||
|
||||
syscall!(
|
||||
socket(domain, SOCK_STREAM, 0),
|
||||
PartialEq::eq,
|
||||
INVALID_SOCKET
|
||||
)
|
||||
.and_then(|socket| {
|
||||
syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
|
||||
})
|
||||
}
|
||||
|
||||
fn socket_addr(addr: &SocketAddr) -> (*const SOCKADDR, c_int) {
|
||||
match addr {
|
||||
SocketAddr::V4(ref addr) => (
|
||||
addr as *const _ as *const SOCKADDR,
|
||||
size_of_val(addr) as c_int,
|
||||
),
|
||||
SocketAddr::V6(ref addr) => (
|
||||
addr as *const _ as *const SOCKADDR,
|
||||
size_of_val(addr) as c_int,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,19 +1,34 @@
|
||||
use super::selector::SockState;
|
||||
use super::InternalState;
|
||||
use super::{new_socket, socket_addr};
|
||||
use crate::poll;
|
||||
use crate::sys::windows::init;
|
||||
use crate::{event, Interests, Registry, Token};
|
||||
|
||||
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
|
||||
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::{fmt, io};
|
||||
use winapi::um::winsock2::{bind, closesocket, SOCKET_ERROR, SOCK_DGRAM};
|
||||
|
||||
use crate::sys::windows::init;
|
||||
use crate::sys::windows::selector::{SelectorInner, SockState};
|
||||
|
||||
struct InternalState {
|
||||
selector: Arc<SelectorInner>,
|
||||
token: Token,
|
||||
interests: Interests,
|
||||
sock_state: Option<Arc<Mutex<SockState>>>,
|
||||
}
|
||||
|
||||
impl InternalState {
|
||||
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
|
||||
InternalState {
|
||||
selector,
|
||||
token,
|
||||
interests,
|
||||
sock_state: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UdpSocket {
|
||||
internal: Arc<Mutex<Option<InternalState>>>,
|
||||
internal: Arc<RwLock<Option<InternalState>>>,
|
||||
io: net::UdpSocket,
|
||||
}
|
||||
|
||||
@ -22,16 +37,12 @@ macro_rules! wouldblock {
|
||||
let result = $self.io.$method($($args),*);
|
||||
if let Err(ref e) = result {
|
||||
if e.kind() == io::ErrorKind::WouldBlock {
|
||||
let internal = $self.internal.lock().unwrap();
|
||||
if internal.is_some() {
|
||||
let selector = internal.as_ref().unwrap().selector.clone();
|
||||
let token = internal.as_ref().unwrap().token;
|
||||
let interests = internal.as_ref().unwrap().interests;
|
||||
drop(internal);
|
||||
selector.reregister(
|
||||
let internal = $self.internal.read().unwrap();
|
||||
if let Some(internal) = &*internal {
|
||||
internal.selector.reregister(
|
||||
$self,
|
||||
token,
|
||||
interests,
|
||||
internal.token,
|
||||
internal.interests,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
@ -43,22 +54,10 @@ macro_rules! wouldblock {
|
||||
impl UdpSocket {
|
||||
pub fn bind(addr: SocketAddr) -> io::Result<UdpSocket> {
|
||||
init();
|
||||
new_socket(addr, SOCK_DGRAM).and_then(|socket| {
|
||||
let (raw_addr, raw_addr_length) = socket_addr(&addr);
|
||||
syscall!(
|
||||
bind(socket, raw_addr, raw_addr_length,),
|
||||
PartialEq::eq,
|
||||
SOCKET_ERROR
|
||||
)
|
||||
.map_err(|err| {
|
||||
// Close the socket if we hit an error, ignoring the error
|
||||
// from closing since we can't pass back two errors.
|
||||
let _ = unsafe { closesocket(socket) };
|
||||
err
|
||||
})
|
||||
.map(|_| UdpSocket {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
io: unsafe { net::UdpSocket::from_raw_socket(socket as StdSocket) },
|
||||
net::UdpSocket::bind(addr).and_then(|io| {
|
||||
io.set_nonblocking(true).map(|()| UdpSocket {
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
io,
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -69,21 +68,21 @@ impl UdpSocket {
|
||||
|
||||
pub fn try_clone(&self) -> io::Result<UdpSocket> {
|
||||
self.io.try_clone().map(|io| UdpSocket {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
io,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
|
||||
wouldblock!(self, send_to, buf, target)
|
||||
self.io.send_to(buf, target)
|
||||
}
|
||||
|
||||
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
||||
wouldblock!(self, recv_from, buf)
|
||||
self.io.recv_from(buf)
|
||||
}
|
||||
|
||||
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
||||
wouldblock!(self, peek_from, buf)
|
||||
self.io.peek_from(buf)
|
||||
}
|
||||
|
||||
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
|
||||
@ -99,7 +98,7 @@ impl UdpSocket {
|
||||
}
|
||||
|
||||
pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
|
||||
self.io.connect(addr)
|
||||
wouldblock!(self, connect, addr)
|
||||
}
|
||||
|
||||
pub fn broadcast(&self) -> io::Result<bool> {
|
||||
@ -165,7 +164,7 @@ impl UdpSocket {
|
||||
|
||||
impl super::SocketState for UdpSocket {
|
||||
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
|
||||
let internal = self.internal.lock().unwrap();
|
||||
let internal = self.internal.read().unwrap();
|
||||
match &*internal {
|
||||
Some(internal) => match &internal.sock_state {
|
||||
Some(arc) => Some(arc.clone()),
|
||||
@ -175,7 +174,7 @@ impl super::SocketState for UdpSocket {
|
||||
}
|
||||
}
|
||||
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
match &mut *internal {
|
||||
Some(internal) => {
|
||||
internal.sock_state = sock_state;
|
||||
@ -188,7 +187,7 @@ impl super::SocketState for UdpSocket {
|
||||
impl event::Source for UdpSocket {
|
||||
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
|
||||
{
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
if internal.is_none() {
|
||||
*internal = Some(InternalState::new(
|
||||
poll::selector(registry).clone_inner(),
|
||||
@ -201,7 +200,7 @@ impl event::Source for UdpSocket {
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
}
|
||||
@ -217,7 +216,7 @@ impl event::Source for UdpSocket {
|
||||
let result = poll::selector(registry).reregister(self, token, interests);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
internal.as_mut().unwrap().token = token;
|
||||
internal.as_mut().unwrap().interests = interests;
|
||||
}
|
||||
@ -230,7 +229,7 @@ impl event::Source for UdpSocket {
|
||||
let result = poll::selector(registry).deregister(self);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
let mut internal = self.internal.write().unwrap();
|
||||
*internal = None;
|
||||
}
|
||||
Err(_) => {}
|
||||
@ -239,6 +238,19 @@ impl event::Source for UdpSocket {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UdpSocket {
|
||||
fn drop(&mut self) {
|
||||
let internal = self.internal.read().unwrap();
|
||||
if let Some(internal) = internal.as_ref() {
|
||||
if let Some(sock_state) = internal.sock_state.as_ref() {
|
||||
internal
|
||||
.selector
|
||||
.mark_delete_socket(&mut sock_state.lock().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for UdpSocket {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&self.io, f)
|
||||
@ -248,7 +260,7 @@ impl fmt::Debug for UdpSocket {
|
||||
impl FromRawSocket for UdpSocket {
|
||||
unsafe fn from_raw_socket(rawsocket: RawSocket) -> UdpSocket {
|
||||
UdpSocket {
|
||||
internal: Arc::new(Mutex::new(None)),
|
||||
internal: Arc::new(RwLock::new(None)),
|
||||
io: net::UdpSocket::from_raw_socket(rawsocket),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use crate::{poll, sys, Registry, Token};
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::{poll, sys, Registry, Token};
|
||||
|
||||
/// Waker allows cross-thread waking of [`Poll`].
|
||||
///
|
||||
/// When created it will cause events with [`readable`] readiness and the
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use bytes::BytesMut;
|
||||
use log::debug;
|
||||
use tracing::debug;
|
||||
|
||||
use mio::net::{TcpListener, TcpStream};
|
||||
use mio::{Events, Interests, Poll, Token};
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
use mio::net::{TcpListener, TcpStream};
|
||||
use mio::*;
|
||||
use std::net;
|
||||
use std::sync::{Arc, Barrier};
|
||||
use std::thread::{self, sleep};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
mod util;
|
||||
@ -43,41 +42,6 @@ fn add_then_drop() {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zero_duration_polls_events() {
|
||||
init();
|
||||
|
||||
let mut poll = Poll::new().unwrap();
|
||||
let mut events = Events::with_capacity(16);
|
||||
|
||||
let listener = net::TcpListener::bind(any_local_address()).unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let streams: Vec<TcpStream> = (0..3)
|
||||
.map(|n| {
|
||||
let stream = TcpStream::connect(addr).unwrap();
|
||||
poll.registry()
|
||||
.register(&stream, Token(n), Interests::WRITABLE)
|
||||
.unwrap();
|
||||
stream
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Ensure the TcpStreams have some time to connection and for the events to
|
||||
// show up.
|
||||
sleep(Duration::from_millis(10));
|
||||
|
||||
// Even when passing a zero duration timeout we still want do the system
|
||||
// call.
|
||||
poll.poll(&mut events, Some(Duration::from_nanos(0)))
|
||||
.unwrap();
|
||||
assert!(!events.is_empty());
|
||||
|
||||
// Both need to live until here.
|
||||
drop(streams);
|
||||
drop(listener);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poll_closes_fd() {
|
||||
init();
|
||||
@ -150,6 +114,7 @@ fn test_drop_cancels_interest_and_shuts_down() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(windows, ignore = "can't concurrently poll and register on Windows")]
|
||||
fn test_registry_behind_arc() {
|
||||
// `Registry` should work behind an `Arc`, being `Sync` and `Send`.
|
||||
init();
|
||||
@ -181,7 +146,7 @@ fn test_registry_behind_arc() {
|
||||
barrier3.wait();
|
||||
});
|
||||
|
||||
poll.poll(&mut events, Some(Duration::from_millis(1000)))
|
||||
poll.poll(&mut events, Some(Duration::from_millis(100)))
|
||||
.unwrap();
|
||||
assert!(events.iter().count() >= 1);
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@ use std::io::{self, Write};
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use log::{debug, info, trace};
|
||||
use tracing::{debug, info, trace};
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
use mio::net::UdpSocket;
|
||||
|
||||
@ -1,13 +0,0 @@
|
||||
#[test]
|
||||
#[cfg(unix)]
|
||||
#[cfg(not(debug_assertions))]
|
||||
fn assert_size() {
|
||||
use mio::net::*;
|
||||
use std::mem::size_of;
|
||||
|
||||
// Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the
|
||||
// same size as the system specific socket, i.e. just a file descriptor on Unix platforms.
|
||||
assert_eq!(size_of::<TcpListener>(), size_of::<std::net::TcpListener>());
|
||||
assert_eq!(size_of::<TcpStream>(), size_of::<std::net::TcpStream>());
|
||||
assert_eq!(size_of::<UdpSocket>(), size_of::<std::net::UdpSocket>());
|
||||
}
|
||||
@ -7,10 +7,10 @@ use std::time::Duration;
|
||||
use std::{net, thread};
|
||||
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use log::{debug, info};
|
||||
#[cfg(unix)]
|
||||
use net2::TcpStreamExt;
|
||||
use slab::Slab;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use mio::net::{TcpListener, TcpStream};
|
||||
use mio::{Events, Interests, Poll, Registry, Token};
|
||||
|
||||
@ -4,7 +4,7 @@ use std::str;
|
||||
use std::time;
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::{debug, info};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use mio::net::UdpSocket;
|
||||
use mio::{Events, Interests, Poll, Registry, Token};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user