Compare commits

...

27 Commits

Author SHA1 Message Date
Thomas de Zeeuw
803573fa28 Replace IRC channel with Gitter (#1069)
The Mozilla IRC network is being shutdown, see
https://blog.rust-lang.org/2019/04/26/Mozilla-IRC-Sunset-and-the-Rust-Channel.html
and http://exple.tive.org/blarg/2019/04/26/synchronous-text.
2019-08-29 11:56:08 -07:00
Carl Lerche
017d018d75 try fixing ci again 2019-08-27 18:44:28 +00:00
Carl Lerche
d6edcd13fd fix CI 2019-08-27 18:44:28 +00:00
Carl Lerche
092d6fdc7d fix clippy 2019-08-27 18:44:28 +00:00
Carl Lerche
5a5b5f7d43 include testing --release in CI 2019-08-27 18:44:28 +00:00
Carl Lerche
29fd58deda Fix size test 2019-08-27 18:44:28 +00:00
Carl Lerche
19d5ebe61c misc cleanup
* split up TCP files into stream / listener
* organize import order
* split up misc other files
2019-08-27 18:44:28 +00:00
Thomas de Zeeuw
3550494f0b Use consistent use statement order 2019-08-23 10:16:30 +00:00
Thomas de Zeeuw
72a001a8e3 Use Selector::try_clone in kqueue::Waker
Removing the special Selector::try_clone_waker method.
2019-08-23 10:16:30 +00:00
Thomas de Zeeuw
e739790cb8 Add a test for polling with zero duration (#1065)
This ensures that the relevant system call is made even if the passed
duration is zero.
2019-08-21 19:35:13 -07:00
PerfectLaugh
e054a7d587 Fix issue in #1062 2019-08-21 12:21:47 +00:00
PerfectLaugh
4276d071e3 Reverts two issues
This reverts commit 9dc562ca0f9c2a260a3e2e953a5a636103d8a903.
2019-08-21 11:26:00 +00:00
PerfectLaugh
d07989a3b8 Make is_read_hup detects AFD_POLL_DISCONNECT 2019-08-21 11:26:00 +00:00
PerfectLaugh
c7674d64ad Add test in pr #1065 2019-08-21 11:26:00 +00:00
PerfectLaugh
d7fa70644f Fix udp issue in #1062 2019-08-21 11:26:00 +00:00
PerfectLaugh
588e808576 Changed as requested 2019-08-21 11:26:00 +00:00
PerfectLaugh
1dd6000a6d Recover non-blocking connect and bind 2019-08-21 11:26:00 +00:00
PerfectLaugh
2653117b84 Fix several flags handling 2019-08-21 11:26:00 +00:00
PerfectLaugh
5de1317418 Refactored InternalState, improved select timeout handling and AFD flags fixes 2019-08-21 11:26:00 +00:00
PerfectLaugh
bbc2f774c9 Improve afd_group locks 2019-08-21 11:26:00 +00:00
PerfectLaugh
6b810629c3 Changes RefCell to UnsafeCell 2019-08-21 11:26:00 +00:00
PerfectLaugh
4dc7bae2a0 Comply with rustfmt 2019-08-21 11:26:00 +00:00
PerfectLaugh
7b06041b4c Test without timeout 2019-08-21 11:26:00 +00:00
PerfectLaugh
fbecef6684 Test with extended time length. 2019-08-21 11:26:00 +00:00
PerfectLaugh
b60218d9b2 Cancel ignore of test_registry_behind_arc 2019-08-21 11:26:00 +00:00
PerfectLaugh
c6d88a5810 Fix for https://github.com/tokio-rs/mio/issues/1041#issuecomment-519674615 2019-08-21 11:26:00 +00:00
PerfectLaugh
8e0b0364ab Temporary fix for https://github.com/tokio-rs/mio/issues/1041#issuecomment-519674615 2019-08-21 11:26:00 +00:00
26 changed files with 910 additions and 827 deletions

View File

@ -77,8 +77,10 @@ platform, submit a PR to update the list!
## Community
A group of Mio users hang out in the #mio channel on the Mozilla IRC
server (irc.mozilla.org). This can be a good place to go for questions.
A group of Mio users hang out on [Gitter], this can be a good place to go for
questions.
[Gitter]: https://gitter.im/tokio-rs/mio
## Contributing

View File

@ -14,6 +14,13 @@ jobs:
displayName: Test
cross: true
# Stable --release
- template: ci/azure-test-stable.yml
parameters:
name: stable_release
displayName: Test --release
cmd: test --release
# Nightly
- template: ci/azure-test-stable.yml
parameters:

View File

@ -1,4 +1,5 @@
use crate::{Interests, Registry, Token};
use std::io;
use std::ops::Deref;

View File

@ -7,22 +7,11 @@
//!
//! [portability guidelines]: ../struct.Poll.html#portability
mod tcp;
mod tcp_listener;
pub use self::tcp_listener::TcpListener;
mod tcp_stream;
pub use self::tcp_stream::TcpStream;
mod udp;
pub use self::tcp::{TcpListener, TcpStream};
pub use self::udp::UdpSocket;
#[test]
#[cfg(not(debug_assertions))]
fn assert_size() {
use std::mem::size_of;
use crate::sys;
// Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the
// same size as the system specific socket, i.e. just a file descriptor on Unix platforms.
assert_eq!(size_of::<TcpListener>(), size_of::<sys::TcpListener>());
assert_eq!(size_of::<TcpStream>(), size_of::<sys::TcpStream>());
assert_eq!(size_of::<UdpSocket>(), size_of::<sys::UdpSocket>());
}

170
src/net/tcp_listener.rs Normal file
View File

@ -0,0 +1,170 @@
use super::TcpStream;
#[cfg(debug_assertions)]
use crate::poll::SelectorId;
use crate::{event, sys, Interests, Registry, Token};
use std::fmt;
use std::io;
use std::net::SocketAddr;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
/// A structure representing a socket server
///
/// # Examples
///
/// ```
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Events, Interests, Poll, Token};
/// use mio::net::TcpListener;
/// use std::time::Duration;
///
/// let listener = TcpListener::bind("127.0.0.1:34255".parse()?)?;
///
/// let mut poll = Poll::new()?;
/// let mut events = Events::with_capacity(128);
///
/// // Register the socket with `Poll`
/// poll.registry().register(&listener, Token(0), Interests::READABLE)?;
///
/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
///
/// // There may be a socket ready to be accepted
/// # Ok(())
/// # }
/// ```
pub struct TcpListener {
sys: sys::TcpListener,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}
impl TcpListener {
/// Convenience method to bind a new TCP listener to the specified address
/// to receive new connections.
///
/// This function will take the following steps:
///
/// 1. Create a new TCP socket.
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
/// 3. Bind the socket to the specified address.
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
sys::TcpListener::bind(addr).map(|sys| TcpListener {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
})
}
/// Accepts a new `TcpStream`.
///
/// This may return an `Err(e)` where `e.kind()` is
/// `io::ErrorKind::WouldBlock`. This means a stream may be ready at a later
/// point and one should wait for an event before calling `accept` again.
///
/// If an accepted stream is returned, the remote address of the peer is
/// returned along with it.
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.sys
.accept()
.map(|(sys, addr)| (TcpStream::new(sys), addr))
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.sys.local_addr()
}
/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `TcpListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.sys.try_clone().map(|s| TcpListener {
sys: s,
#[cfg(debug_assertions)]
selector_id: self.selector_id.clone(),
})
}
/// Sets the value for the `IP_TTL` option on this socket.
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.sys.set_ttl(ttl)
}
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`][link].
///
/// [link]: #method.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
self.sys.ttl()
}
/// Get the value of the `SO_ERROR` option on this socket.
///
/// This will retrieve the stored error in the underlying socket, clearing
/// the field in the process. This can be useful for checking errors between
/// calls.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.sys.take_error()
}
}
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.sys, f)
}
}
#[cfg(unix)]
impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.sys.into_raw_fd()
}
}
#[cfg(unix)]
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.sys.as_raw_fd()
}
}
#[cfg(unix)]
impl FromRawFd for TcpListener {
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
TcpListener {
sys: FromRawFd::from_raw_fd(fd),
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}
}

View File

@ -1,26 +1,13 @@
//! Primitives for working with TCP
//!
//! The types provided in this module are non-blocking by default and are
//! designed to be portable across all supported Mio platforms. As long as the
//! [portability guidelines] are followed, the behavior should be identical no
//! matter the target platform.
//!
//! [portability guidelines]: ../struct.Poll.html#portability
use std::fmt;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::SocketAddr;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(debug_assertions)]
use crate::poll::SelectorId;
use crate::{event, sys, Interests, Registry, Token};
/*
*
* ===== TcpStream =====
*
*/
/// A non-blocking TCP stream between a local socket and a remote socket.
///
/// The socket will be closed when the value is dropped.
@ -60,6 +47,14 @@ pub struct TcpStream {
use std::net::Shutdown;
impl TcpStream {
pub(crate) fn new(sys: sys::TcpStream) -> TcpStream {
TcpStream {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}
/// Create a new TCP stream and issue a non-blocking connect to the
/// specified address.
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
@ -235,163 +230,6 @@ impl fmt::Debug for TcpStream {
}
}
/*
*
* ===== TcpListener =====
*
*/
/// A structure representing a socket server
///
/// # Examples
///
/// ```
/// # use std::error::Error;
/// # fn main() -> Result<(), Box<dyn Error>> {
/// use mio::{Events, Interests, Poll, Token};
/// use mio::net::TcpListener;
/// use std::time::Duration;
///
/// let listener = TcpListener::bind("127.0.0.1:34255".parse()?)?;
///
/// let mut poll = Poll::new()?;
/// let mut events = Events::with_capacity(128);
///
/// // Register the socket with `Poll`
/// poll.registry().register(&listener, Token(0), Interests::READABLE)?;
///
/// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
///
/// // There may be a socket ready to be accepted
/// # Ok(())
/// # }
/// ```
pub struct TcpListener {
sys: sys::TcpListener,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}
impl TcpListener {
/// Convenience method to bind a new TCP listener to the specified address
/// to receive new connections.
///
/// This function will take the following steps:
///
/// 1. Create a new TCP socket.
/// 2. Set the `SO_REUSEADDR` option on the socket on Unix.
/// 3. Bind the socket to the specified address.
/// 4. Calls `listen` on the socket to prepare it to receive new connections.
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
sys::TcpListener::bind(addr).map(|sys| TcpListener {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
})
}
/// Accepts a new `TcpStream`.
///
/// This may return an `Err(e)` where `e.kind()` is
/// `io::ErrorKind::WouldBlock`. This means a stream may be ready at a later
/// point and one should wait for an event before calling `accept` again.
///
/// If an accepted stream is returned, the remote address of the peer is
/// returned along with it.
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.sys.accept().map(|(sys, addr)| {
(
TcpStream {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
},
addr,
)
})
}
/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.sys.local_addr()
}
/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `TcpListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.sys.try_clone().map(|s| TcpListener {
sys: s,
#[cfg(debug_assertions)]
selector_id: self.selector_id.clone(),
})
}
/// Sets the value for the `IP_TTL` option on this socket.
///
/// This value sets the time-to-live field that is used in every packet sent
/// from this socket.
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.sys.set_ttl(ttl)
}
/// Gets the value of the `IP_TTL` option for this socket.
///
/// For more information about this option, see [`set_ttl`][link].
///
/// [link]: #method.set_ttl
pub fn ttl(&self) -> io::Result<u32> {
self.sys.ttl()
}
/// Get the value of the `SO_ERROR` option on this socket.
///
/// This will retrieve the stored error in the underlying socket, clearing
/// the field in the process. This can be useful for checking errors between
/// calls.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.sys.take_error()
}
}
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.sys, f)
}
}
/*
*
* ===== UNIX ext =====
*
*/
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(unix)]
impl IntoRawFd for TcpStream {
fn into_raw_fd(self) -> RawFd {
@ -416,28 +254,3 @@ impl FromRawFd for TcpStream {
}
}
}
#[cfg(unix)]
impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.sys.into_raw_fd()
}
}
#[cfg(unix)]
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.sys.as_raw_fd()
}
}
#[cfg(unix)]
impl FromRawFd for TcpListener {
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
TcpListener {
sys: FromRawFd::from_raw_fd(fd),
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}
}

View File

@ -14,6 +14,8 @@ use crate::{event, sys, Interests, Registry, Token};
use std::fmt;
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
/// A User Datagram Protocol socket.
///
@ -532,9 +534,6 @@ impl fmt::Debug for UdpSocket {
*
*/
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(unix)]
impl IntoRawFd for UdpSocket {
fn into_raw_fd(self) -> RawFd {

View File

@ -1,3 +1,6 @@
use crate::{event, sys, Events, Interests, Token};
use log::trace;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
@ -5,10 +8,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{fmt, io};
use log::trace;
use crate::{event, sys, Events, Interests, Token};
/// Polls for readiness events on all registered values.
///
/// `Poll` allows a program to monitor a large number of [`event::Source`]s,

View File

@ -1,15 +1,14 @@
use crate::sys::Events;
use crate::{Interests, Token};
use libc::{EPOLLET, EPOLLIN, EPOLLOUT};
use log::error;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, i32, io, ptr};
use libc::{EPOLLET, EPOLLIN, EPOLLOUT};
use log::error;
use crate::sys::Events;
use crate::{Interests, Token};
/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);

View File

@ -1,3 +1,7 @@
use crate::sys::Events;
use crate::{Interests, Token};
use log::error;
use std::mem::MaybeUninit;
use std::os::unix::io::{AsRawFd, RawFd};
#[cfg(debug_assertions)]
@ -5,11 +9,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{cmp, io, ptr, slice};
use log::error;
use crate::sys::Events;
use crate::{Interests, Token};
/// Unique id for use as `SelectorId`.
#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
@ -227,16 +226,6 @@ impl Selector {
})
}
// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn try_clone_waker(&self) -> io::Result<Selector> {
syscall!(dup(self.kq)).map(|new_kq| Selector {
#[cfg(debug_assertions)]
id: self.id,
kq: new_kq,
})
}
// Used by `Waker`.
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
pub fn wake(&self, token: Token) -> io::Result<()> {

View File

@ -40,72 +40,17 @@ mod kqueue;
))]
pub use self::kqueue::{event, Event, Selector};
mod net;
mod sourcefd;
mod tcp;
mod tcp_listener;
mod tcp_stream;
mod udp;
mod waker;
pub use self::sourcefd::SourceFd;
pub use self::tcp::{TcpListener, TcpStream};
pub use self::tcp_listener::TcpListener;
pub use self::tcp_stream::TcpStream;
pub use self::udp::UdpSocket;
pub use self::waker::Waker;
pub type Events = Vec<Event>;
pub mod net {
use std::io;
use std::mem::size_of_val;
use std::net::SocketAddr;
/// Create a new non-blocking socket.
pub fn new_socket(addr: SocketAddr, socket_type: libc::c_int) -> io::Result<libc::c_int> {
let domain = match addr {
SocketAddr::V4(..) => libc::AF_INET,
SocketAddr::V6(..) => libc::AF_INET6,
};
#[cfg(any(
target_os = "android",
target_os = "bitrig",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
))]
let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
// Gives a warning for platforms without SOCK_NONBLOCK.
#[allow(clippy::let_and_return)]
let socket = syscall!(socket(domain, socket_type, 0));
// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
// Solaris, couldn't find anything online.
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
let socket = socket.and_then(|socket| {
// For platforms that don't support flags in socket, we need to
// set the flags ourselves.
syscall!(fcntl(
socket,
libc::F_SETFL,
libc::O_NONBLOCK | libc::O_CLOEXEC
))
.map(|_| socket)
});
socket
}
pub fn socket_addr(addr: &SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
match addr {
SocketAddr::V4(ref addr) => (
addr as *const _ as *const libc::sockaddr,
size_of_val(addr) as libc::socklen_t,
),
SocketAddr::V6(ref addr) => (
addr as *const _ as *const libc::sockaddr,
size_of_val(addr) as libc::socklen_t,
),
}
}
}

55
src/sys/unix/net.rs Normal file
View File

@ -0,0 +1,55 @@
use std::io;
use std::mem::size_of_val;
use std::net::SocketAddr;
/// Create a new non-blocking socket.
pub fn new_socket(addr: SocketAddr, socket_type: libc::c_int) -> io::Result<libc::c_int> {
let domain = match addr {
SocketAddr::V4(..) => libc::AF_INET,
SocketAddr::V6(..) => libc::AF_INET6,
};
#[cfg(any(
target_os = "android",
target_os = "bitrig",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
))]
let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;
// Gives a warning for platforms without SOCK_NONBLOCK.
#[allow(clippy::let_and_return)]
let socket = syscall!(socket(domain, socket_type, 0));
// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. Not sure about
// Solaris, couldn't find anything online.
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
let socket = socket.and_then(|socket| {
// For platforms that don't support flags in socket, we need to
// set the flags ourselves.
syscall!(fcntl(
socket,
libc::F_SETFL,
libc::O_NONBLOCK | libc::O_CLOEXEC
))
.map(|_| socket)
});
socket
}
pub fn socket_addr(addr: &SocketAddr) -> (*const libc::sockaddr, libc::socklen_t) {
match addr {
SocketAddr::V4(ref addr) => (
addr as *const _ as *const libc::sockaddr,
size_of_val(addr) as libc::socklen_t,
),
SocketAddr::V6(ref addr) => (
addr as *const _ as *const libc::sockaddr,
size_of_val(addr) as libc::socklen_t,
),
}
}

View File

@ -0,0 +1,113 @@
use crate::sys::unix::net::{new_socket, socket_addr};
use crate::sys::unix::{SourceFd, TcpStream};
use crate::{event, Interests, Registry, Token};
use std::fmt;
use std::io;
use std::mem::size_of;
use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
pub struct TcpListener {
inner: net::TcpListener,
}
impl TcpListener {
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
new_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
// Set SO_REUSEADDR (mirrors what libstd does).
syscall!(setsockopt(
socket,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&1 as *const libc::c_int as *const libc::c_void,
size_of::<libc::c_int>() as libc::socklen_t,
))
.and_then(|_| {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(bind(socket, raw_addr, raw_addr_length))
})
.and_then(|_| syscall!(listen(socket, 1024)))
.map_err(|err| {
// Close the socket if we hit an error, ignoring the error
// from closing since we can't pass back two errors.
let _ = unsafe { libc::close(socket) };
err
})
.map(|_| TcpListener {
inner: unsafe { net::TcpListener::from_raw_fd(socket) },
})
})
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.inner.try_clone().map(|s| TcpListener { inner: s })
}
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.inner
.accept()
.map(|(inner, addr)| (TcpStream::new(inner), addr))
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.inner.set_ttl(ttl)
}
pub fn ttl(&self) -> io::Result<u32> {
self.inner.ttl()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
}
}
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).deregister(registry)
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
}
}
impl FromRawFd for TcpListener {
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
TcpListener {
inner: net::TcpListener::from_raw_fd(fd),
}
}
}
impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.inner.into_raw_fd()
}
}
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

View File

@ -1,22 +1,21 @@
use std::fmt;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::mem::size_of;
use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::sys::unix::net::{new_socket, socket_addr};
use crate::sys::unix::SourceFd;
use crate::{event, Interests, Registry, Token};
use std::fmt;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{self, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
pub struct TcpStream {
inner: net::TcpStream,
}
pub struct TcpListener {
inner: net::TcpListener,
}
impl TcpStream {
pub(crate) fn new(inner: net::TcpStream) -> TcpStream {
TcpStream { inner }
}
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
new_socket(addr, libc::SOCK_STREAM)
.and_then(|socket| {
@ -149,103 +148,3 @@ impl AsRawFd for TcpStream {
self.inner.as_raw_fd()
}
}
impl TcpListener {
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
new_socket(addr, libc::SOCK_STREAM).and_then(|socket| {
// Set SO_REUSEADDR (mirrors what libstd does).
syscall!(setsockopt(
socket,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&1 as *const libc::c_int as *const libc::c_void,
size_of::<libc::c_int>() as libc::socklen_t,
))
.and_then(|_| {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(bind(socket, raw_addr, raw_addr_length))
})
.and_then(|_| syscall!(listen(socket, 1024)))
.map_err(|err| {
// Close the socket if we hit an error, ignoring the error
// from closing since we can't pass back two errors.
let _ = unsafe { libc::close(socket) };
err
})
.map(|_| TcpListener {
inner: unsafe { net::TcpListener::from_raw_fd(socket) },
})
})
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.inner.try_clone().map(|s| TcpListener { inner: s })
}
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
self.inner
.accept()
.map(|(inner, addr)| (TcpStream { inner }, addr))
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.inner.set_ttl(ttl)
}
pub fn ttl(&self) -> io::Result<u32> {
self.inner.ttl()
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.inner.take_error()
}
}
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).deregister(registry)
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.inner, f)
}
}
impl FromRawFd for TcpListener {
unsafe fn from_raw_fd(fd: RawFd) -> TcpListener {
TcpListener {
inner: net::TcpListener::from_raw_fd(fd),
}
}
}
impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.inner.into_raw_fd()
}
}
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

View File

@ -1,11 +1,11 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{fmt, io, net};
use crate::sys::unix::net::{new_socket, socket_addr};
use crate::unix::SourceFd;
use crate::{event, Interests, Registry, Token};
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{fmt, io, net};
pub struct UdpSocket {
io: net::UdpSocket,
}

View File

@ -1,12 +1,12 @@
#[cfg(any(target_os = "linux", target_os = "android"))]
mod eventfd {
use crate::sys::Selector;
use crate::{Interests, Token};
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::FromRawFd;
use crate::sys::Selector;
use crate::{Interests, Token};
/// Waker backed by `eventfd`.
///
/// `eventfd` is effectively an 64 bit counter. All writes must be of 8
@ -63,11 +63,11 @@ pub use self::eventfd::Waker;
#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
mod kqueue {
use std::io;
use crate::sys::Selector;
use crate::Token;
use std::io;
/// Waker backed by kqueue user space notifications (`EVFILT_USER`).
///
/// The implementation is fairly simple, first the kqueue must be setup to
@ -82,7 +82,7 @@ mod kqueue {
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
selector.try_clone_waker().and_then(|selector| {
selector.try_clone().and_then(|selector| {
selector
.setup_waker(token)
.map(|()| Waker { selector, token })
@ -106,13 +106,13 @@ pub use self::kqueue::Waker;
target_os = "solaris"
))]
mod pipe {
use crate::sys::unix::Selector;
use crate::{Interests, Token};
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::FromRawFd;
use crate::sys::unix::Selector;
use crate::{Interests, Token};
/// Waker backed by a unix pipe.
///
/// Waker controls both the sending and receiving ends and empties the pipe

View File

@ -1,16 +1,18 @@
use lazy_static::lazy_static;
use miow::iocp::CompletionPort;
use ntapi::ntioapi::FILE_OPEN;
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
use ntapi::ntioapi::{NtCancelIoFileEx, NtCreateFile, NtDeviceIoControlFile};
use ntapi::ntrtl::RtlNtStatusToDosError;
use std::ffi::OsStr;
use std::fmt;
use std::fs::File;
use std::io;
use std::mem::{size_of, zeroed};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
use miow::iocp::CompletionPort;
use std::os::windows::ffi::OsStrExt;
use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
use winapi::shared::ntdef::{
HANDLE, LARGE_INTEGER, NTSTATUS, OBJECT_ATTRIBUTES, PVOID, ULONG, UNICODE_STRING,
};
@ -20,11 +22,6 @@ use winapi::um::winbase::{SetFileCompletionNotificationModes, FILE_SKIP_SET_EVEN
use winapi::um::winnt::SYNCHRONIZE;
use winapi::um::winnt::{FILE_SHARE_READ, FILE_SHARE_WRITE};
use ntapi::ntioapi::FILE_OPEN;
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
use ntapi::ntioapi::{NtCancelIoFileEx, NtCreateFile, NtDeviceIoControlFile};
use ntapi::ntrtl::RtlNtStatusToDosError;
const IOCTL_AFD_POLL: ULONG = 0x00012024;
static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
@ -45,8 +42,6 @@ struct ObjectAttributes(OBJECT_ATTRIBUTES);
unsafe impl Send for ObjectAttributes {}
unsafe impl Sync for ObjectAttributes {}
use lazy_static::lazy_static;
lazy_static! {
static ref AFD_OBJ_NAME: UnicodeString = UnicodeString(UNICODE_STRING {
// Lengths are calced in bytes
@ -132,8 +127,8 @@ impl Afd {
));
}
let fd = File::from_raw_handle(afd_helper_handle as RawHandle);
let afd = Afd { fd };
let token = NEXT_TOKEN.fetch_add(1, Ordering::Relaxed) + 1;
let afd = Afd { fd };
cp.add_handle(token, &afd.fd)?;
match SetFileCompletionNotificationModes(
afd_helper_handle,
@ -223,6 +218,5 @@ pub const KNOWN_AFD_EVENTS: u32 = AFD_POLL_RECEIVE
| AFD_POLL_SEND
| AFD_POLL_DISCONNECT
| AFD_POLL_ABORT
| AFD_POLL_LOCAL_CLOSE
| AFD_POLL_ACCEPT
| AFD_POLL_CONNECT_FAIL;

View File

@ -14,34 +14,33 @@ pub fn token(event: &Event) -> Token {
}
pub fn is_readable(event: &Event) -> bool {
if is_hup(event) {
if is_error(event) || is_read_hup(event) {
return true;
}
(event.flags & (afd::KNOWN_AFD_EVENTS & !afd::AFD_POLL_SEND)) != 0
event.flags & (afd::AFD_POLL_RECEIVE | afd::AFD_POLL_ACCEPT) != 0
}
pub fn is_writable(event: &Event) -> bool {
if is_hup(event) {
if is_error(event) {
return true;
}
(event.flags & (afd::AFD_POLL_SEND)) != 0
event.flags & afd::AFD_POLL_SEND != 0
}
pub fn is_error(event: &Event) -> bool {
event.flags == afd::AFD_POLL_CONNECT_FAIL
event.flags & afd::AFD_POLL_CONNECT_FAIL != 0
}
pub fn is_hup(event: &Event) -> bool {
(event.flags & (afd::AFD_POLL_ABORT | afd::AFD_POLL_CONNECT_FAIL)) != 0
event.flags & afd::AFD_POLL_ABORT != 0
}
pub fn is_read_hup(_: &Event) -> bool {
// Not supported.
false
pub fn is_read_hup(event: &Event) -> bool {
event.flags & afd::AFD_POLL_DISCONNECT != 0
}
pub fn is_priority(event: &Event) -> bool {
(event.flags & afd::AFD_POLL_RECEIVE_EXPEDITED) != 0
event.flags & afd::AFD_POLL_RECEIVE_EXPEDITED != 0
}
pub fn is_aio(_: &Event) -> bool {

View File

@ -1,8 +1,7 @@
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
use std::cell::UnsafeCell;
use std::fmt;
use ntapi::ntioapi::{IO_STATUS_BLOCK_u, IO_STATUS_BLOCK};
pub struct IoStatusBlock(UnsafeCell<IO_STATUS_BLOCK>);
// There is a pointer field in `IO_STATUS_BLOCK_u`, which we don't use that. Thus it is safe to implement Send here.

View File

@ -1,4 +1,12 @@
use std::io;
use std::mem::size_of_val;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::{Arc, Mutex, Once};
use winapi::ctypes::c_int;
use winapi::shared::ws2def::SOCKADDR;
use winapi::um::winsock2::{
ioctlsocket, socket, FIONBIO, INVALID_SOCKET, PF_INET, PF_INET6, SOCKET,
};
/// Helper macro to execute a system call that returns an `io::Result`.
//
@ -33,6 +41,35 @@ pub trait SocketState {
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>);
}
use crate::{Interests, Token};
struct InternalState {
selector: Arc<SelectorInner>,
token: Token,
interests: Interests,
sock_state: Option<Arc<Mutex<SockState>>>,
}
impl InternalState {
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
InternalState {
selector,
token,
interests,
sock_state: None,
}
}
}
impl Drop for InternalState {
fn drop(&mut self) {
if let Some(sock_state) = self.sock_state.as_ref() {
let mut sock_state = sock_state.lock().unwrap();
sock_state.mark_delete();
}
}
}
/// Initialise the network stack for Windows.
fn init() {
static INIT: Once = Once::new();
@ -43,3 +80,48 @@ fn init() {
drop(std::net::UdpSocket::bind("127.0.0.1:0"));
});
}
/// Create a new non-blocking socket.
fn new_socket(addr: SocketAddr, socket_type: c_int) -> io::Result<SOCKET> {
let domain = match addr {
SocketAddr::V4(..) => PF_INET,
SocketAddr::V6(..) => PF_INET6,
};
syscall!(
socket(domain, socket_type, 0),
PartialEq::eq,
INVALID_SOCKET
)
.and_then(|socket| {
syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
})
}
fn socket_addr(addr: &SocketAddr) -> (*const SOCKADDR, c_int) {
match addr {
SocketAddr::V4(ref addr) => (
addr as *const _ as *const SOCKADDR,
size_of_val(addr) as c_int,
),
SocketAddr::V6(ref addr) => (
addr as *const _ as *const SOCKADDR,
size_of_val(addr) as c_int,
),
}
}
fn inaddr_any(other: SocketAddr) -> SocketAddr {
match other {
SocketAddr::V4(..) => {
let any = Ipv4Addr::new(0, 0, 0, 0);
let addr = SocketAddrV4::new(any, 0);
SocketAddr::V4(addr)
}
SocketAddr::V6(..) => {
let any = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
let addr = SocketAddrV6::new(any, 0, 0, 0);
SocketAddr::V6(addr)
}
}
}

View File

@ -1,38 +1,78 @@
use std::collections::VecDeque;
use std::io;
use std::mem::size_of;
use std::pin::Pin;
use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::os::windows::io::{AsRawSocket, RawSocket};
use winapi::shared::ntdef::NT_SUCCESS;
use winapi::shared::ntdef::{HANDLE, PVOID};
use winapi::shared::ntstatus::STATUS_CANCELLED;
use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING};
use winapi::um::mswsock::SIO_BASE_HANDLE;
use winapi::um::winsock2::{WSAIoctl, INVALID_SOCKET, SOCKET_ERROR};
use miow::iocp::{CompletionPort, CompletionStatus};
use crate::sys::Events;
use crate::{Interests, Token};
use super::afd::{Afd, AfdPollInfo};
use super::afd::{
AFD_POLL_ABORT, AFD_POLL_ACCEPT, AFD_POLL_CONNECT_FAIL, AFD_POLL_DISCONNECT,
AFD_POLL_LOCAL_CLOSE, AFD_POLL_RECEIVE, AFD_POLL_RECEIVE_EXPEDITED, AFD_POLL_SEND,
KNOWN_AFD_EVENTS,
AFD_POLL_LOCAL_CLOSE, AFD_POLL_RECEIVE, AFD_POLL_SEND, KNOWN_AFD_EVENTS,
};
use super::io_status_block::IoStatusBlock;
use super::Event;
use super::SocketState;
use crate::sys::Events;
use crate::{Interests, Token};
use miow::iocp::{CompletionPort, CompletionStatus};
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io;
use std::mem::size_of;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::pin::Pin;
use std::ptr::null_mut;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use winapi::shared::ntdef::NT_SUCCESS;
use winapi::shared::ntdef::{HANDLE, PVOID};
use winapi::shared::ntstatus::STATUS_CANCELLED;
use winapi::shared::winerror::{ERROR_INVALID_HANDLE, ERROR_IO_PENDING, WAIT_TIMEOUT};
use winapi::um::mswsock::SIO_BASE_HANDLE;
use winapi::um::winsock2::{WSAIoctl, INVALID_SOCKET, SOCKET_ERROR};
const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
#[derive(Debug)]
struct AfdGroup {
cp: Arc<CompletionPort>,
afd_group: Mutex<Vec<Arc<Afd>>>,
}
impl AfdGroup {
pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
AfdGroup {
afd_group: Mutex::new(Vec::new()),
cp,
}
}
pub fn acquire(&self) -> io::Result<Arc<Afd>> {
let mut afd_group = self.afd_group.lock().unwrap();
if afd_group.len() == 0 {
self._alloc_afd_group(&mut afd_group)?;
} else {
// + 1 reference in Vec
if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
self._alloc_afd_group(&mut afd_group)?;
}
}
match afd_group.last() {
Some(arc) => Ok(arc.clone()),
None => unreachable!(),
}
}
pub fn release_unused_afd(&self) {
let mut afd_group = self.afd_group.lock().unwrap();
afd_group.retain(|g| Arc::strong_count(&g) > 1);
}
fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
let afd = Afd::new(&self.cp)?;
let arc = Arc::new(afd);
afd_group.push(arc);
Ok(())
}
}
/// This is the deallocation wrapper for overlapped pointer.
/// In case of error or status changing before the overlapped pointer is actually used(or not even being used),
/// this wrapper will decrease the reference count of Arc if being dropped.
@ -135,12 +175,10 @@ impl SockState {
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
self.cancel()?;
return Ok(());
}
} else if let SockPollStatus::Cancelled = self.poll_status {
/* The poll operation has already been cancelled, we're still waiting for
* it to return. For now, there's nothing that needs to be done. */
return Ok(());
} else if let SockPollStatus::Idle = self.poll_status {
/* No poll operation is pending; start one. */
self.poll_info.exclusive = 0;
@ -150,7 +188,7 @@ impl SockState {
}
self.poll_info.handles[0].handle = self.base_socket as HANDLE;
self.poll_info.handles[0].status = 0;
self.poll_info.handles[0].events = self.user_evts;
self.poll_info.handles[0].events = self.user_evts | AFD_POLL_LOCAL_CLOSE;
let wrapped_overlapped = OverlappedArcWrapper::new(self_arc);
let overlapped = wrapped_overlapped.get_ptr() as *const _ as PVOID;
@ -159,24 +197,23 @@ impl SockState {
.poll(&mut self.poll_info, (*self.iosb).as_mut_ptr(), overlapped)
};
if let Err(e) = result {
if let Some(code) = e.raw_os_error() {
if code == ERROR_IO_PENDING as i32 {
/* Overlapped poll operation in progress; this is expected. */
} else if code == ERROR_INVALID_HANDLE as i32 {
/* Socket closed; it'll be dropped. */
self.mark_delete();
return Ok(());
} else {
return Err(e);
}
let code = e.raw_os_error().unwrap();
if code == ERROR_IO_PENDING as i32 {
/* Overlapped poll operation in progress; this is expected. */
} else if code == ERROR_INVALID_HANDLE as i32 {
/* Socket closed; it'll be dropped. */
self.mark_delete();
return Ok(());
} else {
return Err(e);
}
}
self.poll_status = SockPollStatus::Pending;
if self.self_wrapped.is_some() {
// This shouldn't be happening. We cannot deallocate already pending overlapped before feed_event so we need to stand out here to declare unreachable.
unreachable!();
}
self.poll_status = SockPollStatus::Pending;
self.self_wrapped = Some(wrapped_overlapped);
self.pending_evts = self.user_evts;
} else {
@ -198,16 +235,6 @@ impl SockState {
Ok(())
}
fn mark_delete(&mut self) {
if !self.delete_pending {
if let SockPollStatus::Pending = self.poll_status {
drop(self.cancel());
}
self.delete_pending = true;
}
}
// This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
fn feed_event(&mut self) -> Option<Event> {
if self.self_wrapped.is_some() {
@ -233,6 +260,7 @@ impl SockState {
} else if self.poll_info.number_of_handles < 1 {
/* This poll operation succeeded but didn't report any socket events. */
} else if self.poll_info.handles[0].events & AFD_POLL_LOCAL_CLOSE != 0 {
/* The poll operation reported that the socket was closed. */
self.mark_delete();
return None;
} else {
@ -251,12 +279,12 @@ impl SockState {
// then reregister the socket to reset the interests.
// Reset readable event
if (afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND)) != 0 {
self.user_evts &= !(afd_events & (KNOWN_AFD_EVENTS & !AFD_POLL_SEND));
if (afd_events & interests_to_afd_flags(Interests::READABLE)) != 0 {
self.user_evts &= !(interests_to_afd_flags(Interests::READABLE));
}
// Reset writable event
if (afd_events & AFD_POLL_SEND) != 0 {
self.user_evts &= !AFD_POLL_SEND;
if (afd_events & interests_to_afd_flags(Interests::WRITABLE)) != 0 {
self.user_evts &= !interests_to_afd_flags(Interests::WRITABLE);
}
Some(Event {
@ -265,9 +293,25 @@ impl SockState {
})
}
fn is_pending_deletion(&self) -> bool {
pub fn is_pending_deletion(&self) -> bool {
self.delete_pending
}
pub fn mark_delete(&mut self) {
if !self.delete_pending {
if let SockPollStatus::Pending = self.poll_status {
drop(self.cancel());
}
self.delete_pending = true;
}
}
}
impl Drop for SockState {
fn drop(&mut self) {
self.mark_delete();
}
}
/// Each Selector has a globally unique(ish) ID associated with it. This ID
@ -351,43 +395,106 @@ impl Selector {
#[derive(Debug)]
pub struct SelectorInner {
cp: CompletionPort,
active_poll_count: AtomicUsize,
update_queue: Mutex<VecDeque<Arc<Mutex<SockState>>>>,
afd_group: Mutex<Vec<Arc<Afd>>>,
lock: Mutex<()>,
cp: Arc<CompletionPort>,
active_poll_count: UnsafeCell<usize>,
update_queue: UnsafeCell<VecDeque<Arc<Mutex<SockState>>>>,
afd_group: AfdGroup,
}
// We have ensured thread safety by introducing lock manually.
unsafe impl Sync for SelectorInner {}
impl SelectorInner {
pub fn new() -> io::Result<SelectorInner> {
CompletionPort::new(0).map(|cp| SelectorInner {
cp: cp,
active_poll_count: AtomicUsize::new(0),
update_queue: Mutex::new(VecDeque::new()),
afd_group: Mutex::new(Vec::new()),
CompletionPort::new(0).map(|cp| {
let cp = Arc::new(cp);
let cp_afd = Arc::clone(&cp);
SelectorInner {
lock: Mutex::new(()),
cp: cp,
active_poll_count: UnsafeCell::new(0),
update_queue: UnsafeCell::new(VecDeque::new()),
afd_group: AfdGroup::new(cp_afd),
}
})
}
pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
events.clear();
self.update_sockets_events()?;
let mut n = 0;
let start = Instant::now();
self.active_poll_count.fetch_add(1, Ordering::SeqCst);
let result = self.cp.get_many(&mut events.statuses, timeout);
self.active_poll_count.fetch_sub(1, Ordering::SeqCst);
if let Err(e) = result {
use winapi::shared::winerror::WAIT_TIMEOUT;
if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) {
loop {
if timeout.is_none() {
let len = self.select2(&mut events.statuses, &mut events.events, None)?;
if len == 0 {
continue;
}
return Ok(());
} else {
if n >= events.statuses.len() {
return Ok(());
}
let timeout = timeout.unwrap();
let deadline = start + timeout;
let now = Instant::now();
if timeout.as_nanos() != 0 {
if now >= deadline {
return Ok(());
}
let len = self.select2(
&mut events.statuses[n..],
&mut events.events,
Some(deadline - now),
)?;
if len == 0 {
return Ok(());
}
n += len;
} else {
self.select2(&mut events.statuses[n..], &mut events.events, Some(timeout))?;
return Ok(());
}
}
}
}
pub fn select2(
&self,
statuses: &mut [CompletionStatus],
events: &mut Vec<Event>,
timeout: Option<Duration>,
) -> io::Result<usize> {
{
let _guard = self.lock.lock().unwrap();
unsafe {
self.update_sockets_events()?;
*self.active_poll_count.get() += 1;
}
return Err(e);
}
self.feed_events(&mut events.events, result.unwrap());
Ok(())
let result = self.cp.get_many(statuses, timeout);
{
let _guard = self.lock.lock().unwrap();
unsafe {
*self.active_poll_count.get() -= 1;
}
if let Err(e) = result {
if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) {
return Ok(0);
}
return Err(e);
}
unsafe { Ok(self.feed_events(events, result.unwrap())) }
}
}
pub fn register<S: SocketState + AsRawSocket>(
@ -412,8 +519,13 @@ impl SelectorInner {
sock.lock().unwrap().set_event(event);
}
socket.set_sock_state(Some(sock));
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
{
let _guard = self.lock.lock().unwrap();
unsafe {
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
}
}
Ok(())
}
@ -438,8 +550,13 @@ impl SelectorInner {
{
sock.lock().unwrap().set_event(event);
}
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
{
let _guard = self.lock.lock().unwrap();
unsafe {
self.add_socket_to_update_queue(socket);
self.update_sockets_events_if_polling()?;
}
}
Ok(())
}
@ -449,7 +566,7 @@ impl SelectorInner {
return Err(io::Error::from(io::ErrorKind::NotFound));
}
socket.set_sock_state(None);
self._release_unused_afd();
self.afd_group.release_unused_afd();
Ok(())
}
@ -457,112 +574,76 @@ impl SelectorInner {
&self.cp
}
pub fn mark_delete_socket(&self, sock_state: &mut SockState) {
sock_state.mark_delete();
}
fn update_sockets_events(&self) -> io::Result<()> {
{
let mut update_queue = self.update_queue.lock().unwrap();
loop {
let sock = match update_queue.pop_front() {
Some(sock) => sock,
None => break,
};
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
sock_internal.update(&sock).unwrap();
}
unsafe fn update_sockets_events(&self) -> io::Result<()> {
let update_queue = &mut *self.update_queue.get();
loop {
let sock = match update_queue.pop_front() {
Some(sock) => sock,
None => break,
};
let mut sock_internal = sock.lock().unwrap();
if !sock_internal.is_pending_deletion() {
sock_internal.update(&sock).unwrap();
}
}
self._release_unused_afd();
self.afd_group.release_unused_afd();
Ok(())
}
fn update_sockets_events_if_polling(&self) -> io::Result<()> {
if self.active_poll_count.load(Ordering::SeqCst) > 0 {
unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
let active_poll_count = *self.active_poll_count.get();
if active_poll_count > 0 {
return self.update_sockets_events();
}
Ok(())
}
fn feed_events(&self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus]) {
{
let mut update_queue = self.update_queue.lock().unwrap();
for iocp_event in iocp_events.iter() {
if iocp_event.overlapped() as usize == 0 {
events.push(Event {
flags: AFD_POLL_RECEIVE,
data: iocp_event.token() as u64,
});
continue;
}
let sock_arc =
unsafe { Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>) };
let mut sock_guard = sock_arc.lock().unwrap();
match sock_guard.feed_event() {
Some(e) => {
events.push(e);
}
None => {}
}
if !sock_guard.is_pending_deletion() {
update_queue.push_back(sock_arc.clone());
unsafe fn add_socket_to_update_queue<S: SocketState>(&self, socket: &S) {
let sock_state = socket.get_sock_state().unwrap();
let update_queue = &mut *self.update_queue.get();
update_queue.push_back(sock_state);
}
// It returns processed count of iocp_events rather than the events itself.
unsafe fn feed_events(
&self,
events: &mut Vec<Event>,
iocp_events: &[CompletionStatus],
) -> usize {
let mut n = 0;
let update_queue = &mut *self.update_queue.get();
for iocp_event in iocp_events.iter() {
if iocp_event.overlapped() as usize == 0 {
events.push(Event {
flags: AFD_POLL_RECEIVE,
data: iocp_event.token() as u64,
});
n += 1;
continue;
}
let sock_arc = Arc::from_raw(iocp_event.overlapped() as *const Mutex<SockState>);
let mut sock_guard = sock_arc.lock().unwrap();
match sock_guard.feed_event() {
Some(e) => {
events.push(e);
}
None => {}
}
n += 1;
if !sock_guard.is_pending_deletion() {
update_queue.push_back(sock_arc.clone());
}
}
self._release_unused_afd();
}
fn _acquire_afd(&self) -> io::Result<Arc<Afd>> {
let mut need_alloc = false;
{
let afd_group = self.afd_group.lock().unwrap();
if afd_group.len() == 0 {
need_alloc = true;
} else {
// + 1 reference in Vec
if Arc::strong_count(afd_group.last().unwrap()) >= POLL_GROUP__MAX_GROUP_SIZE + 1 {
need_alloc = true;
}
}
}
if need_alloc {
self._alloc_afd_group()?;
}
match self.afd_group.lock().unwrap().last() {
Some(rc) => Ok(rc.clone()),
None => unreachable!(),
}
}
fn _release_unused_afd(&self) {
let mut afd_group = self.afd_group.lock().unwrap();
afd_group.retain(|g| Arc::strong_count(&g) > 1);
}
fn _alloc_afd_group(&self) -> io::Result<()> {
let mut afd_group = self.afd_group.lock().unwrap();
let afd = Afd::new(&self.cp)?;
let rc = Arc::new(afd);
afd_group.push(rc);
Ok(())
self.afd_group.release_unused_afd();
n
}
fn _alloc_sock_for_rawsocket(
&self,
raw_socket: RawSocket,
) -> io::Result<Arc<Mutex<SockState>>> {
Ok(Arc::new(Mutex::new(SockState::new(
raw_socket,
self._acquire_afd()?,
)?)))
}
fn add_socket_to_update_queue<S: SocketState>(&self, socket: &S) {
let sock_state = socket.get_sock_state().unwrap();
let mut update_queue = self.update_queue.lock().unwrap();
update_queue.push_back(sock_state);
let afd = self.afd_group.acquire()?;
Ok(Arc::new(Mutex::new(SockState::new(raw_socket, afd)?)))
}
}
@ -570,8 +651,8 @@ fn interests_to_afd_flags(interests: Interests) -> u32 {
let mut flags = 0;
if interests.is_readable() {
flags |=
AFD_POLL_RECEIVE | AFD_POLL_RECEIVE_EXPEDITED | AFD_POLL_ACCEPT | AFD_POLL_DISCONNECT;
// AFD_POLL_DISCONNECT for is_read_hup()
flags |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT | AFD_POLL_DISCONNECT;
}
if interests.is_writable() {

View File

@ -1,50 +1,25 @@
use std::cmp::PartialEq;
use std::fmt;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::mem::size_of_val;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
use std::sync::{Arc, Mutex, RwLock};
use winapi::ctypes::c_int;
use winapi::shared::ws2def::SOCKADDR;
use winapi::um::winsock2::{
bind, closesocket, connect, ioctlsocket, listen, socket, FIONBIO, INVALID_SOCKET, PF_INET,
PF_INET6, SOCKET, SOCKET_ERROR, SOCK_STREAM,
};
use super::selector::SockState;
use super::InternalState;
use super::{inaddr_any, new_socket, socket_addr};
use crate::poll;
use crate::sys::windows::init;
use crate::{event, Interests, Registry, Token};
use super::selector::{SelectorInner, SockState};
struct InternalState {
selector: Arc<SelectorInner>,
token: Token,
interests: Interests,
sock_state: Option<Arc<Mutex<SockState>>>,
}
impl InternalState {
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
InternalState {
selector,
token,
interests,
sock_state: None,
}
}
}
use std::fmt;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{self, SocketAddr};
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
use std::sync::{Arc, Mutex};
use winapi::um::winsock2::{bind, closesocket, connect, listen, SOCKET_ERROR, SOCK_STREAM};
pub struct TcpStream {
internal: Arc<RwLock<Option<InternalState>>>,
internal: Arc<Mutex<Option<InternalState>>>,
inner: net::TcpStream,
}
pub struct TcpListener {
internal: Arc<RwLock<Option<InternalState>>>,
internal: Arc<Mutex<Option<InternalState>>>,
inner: net::TcpListener,
}
@ -53,12 +28,16 @@ macro_rules! wouldblock {
let result = (&$self.inner).$method();
if let Err(ref e) = result {
if e.kind() == io::ErrorKind::WouldBlock {
let internal = $self.internal.read().unwrap();
if let Some(internal) = &*internal {
internal.selector.reregister(
let internal = $self.internal.lock().unwrap();
if internal.is_some() {
let selector = internal.as_ref().unwrap().selector.clone();
let token = internal.as_ref().unwrap().token;
let interests = internal.as_ref().unwrap().interests;
drop(internal);
selector.reregister(
$self,
internal.token,
internal.interests,
token,
interests,
)?;
}
}
@ -69,12 +48,16 @@ macro_rules! wouldblock {
let result = (&$self.inner).$method($($args),*);
if let Err(ref e) = result {
if e.kind() == io::ErrorKind::WouldBlock {
let internal = $self.internal.read().unwrap();
if let Some(internal) = &*internal {
internal.selector.reregister(
let internal = $self.internal.lock().unwrap();
if internal.is_some() {
let selector = internal.as_ref().unwrap().selector.clone();
let token = internal.as_ref().unwrap().token;
let interests = internal.as_ref().unwrap().interests;
drop(internal);
selector.reregister(
$self,
internal.token,
internal.interests,
token,
interests,
)?;
}
}
@ -86,7 +69,7 @@ macro_rules! wouldblock {
impl TcpStream {
pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
init();
new_socket(addr)
new_socket(addr, SOCK_STREAM)
.and_then(|socket| {
// Required for a future `connect_overlapped` operation to be
// executed successfully.
@ -118,7 +101,7 @@ impl TcpStream {
})
})
.map(|socket| TcpStream {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: unsafe { net::TcpStream::from_raw_socket(socket as StdSocket) },
})
}
@ -133,7 +116,7 @@ impl TcpStream {
pub fn try_clone(&self) -> io::Result<TcpStream> {
self.inner.try_clone().map(|s| TcpStream {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: s,
})
}
@ -167,24 +150,9 @@ impl TcpStream {
}
}
fn inaddr_any(other: SocketAddr) -> SocketAddr {
match other {
SocketAddr::V4(..) => {
let any = Ipv4Addr::new(0, 0, 0, 0);
let addr = SocketAddrV4::new(any, 0);
SocketAddr::V4(addr)
}
SocketAddr::V6(..) => {
let any = Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0);
let addr = SocketAddrV6::new(any, 0, 0, 0);
SocketAddr::V6(addr)
}
}
}
impl super::SocketState for TcpStream {
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
let internal = self.internal.read().unwrap();
let internal = self.internal.lock().unwrap();
match &*internal {
Some(internal) => match &internal.sock_state {
Some(arc) => Some(arc.clone()),
@ -194,7 +162,7 @@ impl super::SocketState for TcpStream {
}
}
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
internal.sock_state = sock_state;
@ -206,7 +174,7 @@ impl super::SocketState for TcpStream {
impl<'a> super::SocketState for &'a TcpStream {
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
let internal = self.internal.read().unwrap();
let internal = self.internal.lock().unwrap();
match &*internal {
Some(internal) => match &internal.sock_state {
Some(arc) => Some(arc.clone()),
@ -216,7 +184,7 @@ impl<'a> super::SocketState for &'a TcpStream {
}
}
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
internal.sock_state = sock_state;
@ -226,19 +194,6 @@ impl<'a> super::SocketState for &'a TcpStream {
}
}
impl Drop for TcpStream {
fn drop(&mut self) {
let internal = self.internal.read().unwrap();
if let Some(internal) = internal.as_ref() {
if let Some(sock_state) = internal.sock_state.as_ref() {
internal
.selector
.mark_delete_socket(&mut sock_state.lock().unwrap());
}
}
}
}
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
wouldblock!(self, read, buf)
@ -290,7 +245,7 @@ impl<'a> Write for &'a TcpStream {
impl event::Source for TcpStream {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
{
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
if internal.is_none() {
*internal = Some(InternalState::new(
poll::selector(registry).clone_inner(),
@ -303,7 +258,7 @@ impl event::Source for TcpStream {
match result {
Ok(_) => {}
Err(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
}
@ -319,7 +274,7 @@ impl event::Source for TcpStream {
let result = poll::selector(registry).reregister(self, token, interests);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
internal.as_mut().unwrap().token = token;
internal.as_mut().unwrap().interests = interests;
}
@ -332,7 +287,7 @@ impl event::Source for TcpStream {
let result = poll::selector(registry).deregister(self);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
Err(_) => {}
@ -350,7 +305,7 @@ impl fmt::Debug for TcpStream {
impl FromRawSocket for TcpStream {
unsafe fn from_raw_socket(rawsocket: RawSocket) -> TcpStream {
TcpStream {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: net::TcpStream::from_raw_socket(rawsocket),
}
}
@ -371,7 +326,7 @@ impl AsRawSocket for TcpStream {
impl TcpListener {
pub fn bind(addr: SocketAddr) -> io::Result<TcpListener> {
init();
new_socket(addr).and_then(|socket| {
new_socket(addr, SOCK_STREAM).and_then(|socket| {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(
bind(socket, raw_addr, raw_addr_length,),
@ -386,7 +341,7 @@ impl TcpListener {
err
})
.map(|_| TcpListener {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: unsafe { net::TcpListener::from_raw_socket(socket as StdSocket) },
})
})
@ -398,7 +353,7 @@ impl TcpListener {
pub fn try_clone(&self) -> io::Result<TcpListener> {
self.inner.try_clone().map(|s| TcpListener {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: s,
})
}
@ -407,7 +362,7 @@ impl TcpListener {
wouldblock!(self, accept).map(|(inner, addr)| {
(
TcpStream {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner,
},
addr,
@ -428,22 +383,9 @@ impl TcpListener {
}
}
impl Drop for TcpListener {
fn drop(&mut self) {
let internal = self.internal.read().unwrap();
if let Some(internal) = internal.as_ref() {
if let Some(sock_state) = internal.sock_state.as_ref() {
internal
.selector
.mark_delete_socket(&mut sock_state.lock().unwrap());
}
}
}
}
impl super::SocketState for TcpListener {
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
let internal = self.internal.read().unwrap();
let internal = self.internal.lock().unwrap();
match &*internal {
Some(internal) => match &internal.sock_state {
Some(arc) => Some(arc.clone()),
@ -453,7 +395,7 @@ impl super::SocketState for TcpListener {
}
}
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
internal.sock_state = sock_state;
@ -466,7 +408,7 @@ impl super::SocketState for TcpListener {
impl event::Source for TcpListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
{
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
if internal.is_none() {
*internal = Some(InternalState::new(
poll::selector(registry).clone_inner(),
@ -479,7 +421,7 @@ impl event::Source for TcpListener {
match result {
Ok(_) => {}
Err(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
}
@ -495,7 +437,7 @@ impl event::Source for TcpListener {
let result = poll::selector(registry).reregister(self, token, interests);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
internal.as_mut().unwrap().token = token;
internal.as_mut().unwrap().interests = interests;
}
@ -508,7 +450,7 @@ impl event::Source for TcpListener {
let result = poll::selector(registry).deregister(self);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
Err(_) => {}
@ -526,7 +468,7 @@ impl fmt::Debug for TcpListener {
impl FromRawSocket for TcpListener {
unsafe fn from_raw_socket(rawsocket: RawSocket) -> TcpListener {
TcpListener {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
inner: net::TcpListener::from_raw_socket(rawsocket),
}
}
@ -543,33 +485,3 @@ impl AsRawSocket for TcpListener {
self.inner.as_raw_socket()
}
}
/// Create a new non-blocking socket.
fn new_socket(addr: SocketAddr) -> io::Result<SOCKET> {
let domain = match addr {
SocketAddr::V4(..) => PF_INET,
SocketAddr::V6(..) => PF_INET6,
};
syscall!(
socket(domain, SOCK_STREAM, 0),
PartialEq::eq,
INVALID_SOCKET
)
.and_then(|socket| {
syscall!(ioctlsocket(socket, FIONBIO, &mut 1), PartialEq::ne, 0).map(|_| socket as SOCKET)
})
}
fn socket_addr(addr: &SocketAddr) -> (*const SOCKADDR, c_int) {
match addr {
SocketAddr::V4(ref addr) => (
addr as *const _ as *const SOCKADDR,
size_of_val(addr) as c_int,
),
SocketAddr::V6(ref addr) => (
addr as *const _ as *const SOCKADDR,
size_of_val(addr) as c_int,
),
}
}

View File

@ -1,34 +1,19 @@
use super::selector::SockState;
use super::InternalState;
use super::{new_socket, socket_addr};
use crate::poll;
use crate::sys::windows::init;
use crate::{event, Interests, Registry, Token};
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
use std::sync::{Arc, Mutex, RwLock};
use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
use std::sync::{Arc, Mutex};
use std::{fmt, io};
use crate::sys::windows::init;
use crate::sys::windows::selector::{SelectorInner, SockState};
struct InternalState {
selector: Arc<SelectorInner>,
token: Token,
interests: Interests,
sock_state: Option<Arc<Mutex<SockState>>>,
}
impl InternalState {
fn new(selector: Arc<SelectorInner>, token: Token, interests: Interests) -> InternalState {
InternalState {
selector,
token,
interests,
sock_state: None,
}
}
}
use winapi::um::winsock2::{bind, closesocket, SOCKET_ERROR, SOCK_DGRAM};
pub struct UdpSocket {
internal: Arc<RwLock<Option<InternalState>>>,
internal: Arc<Mutex<Option<InternalState>>>,
io: net::UdpSocket,
}
@ -37,12 +22,16 @@ macro_rules! wouldblock {
let result = $self.io.$method($($args),*);
if let Err(ref e) = result {
if e.kind() == io::ErrorKind::WouldBlock {
let internal = $self.internal.read().unwrap();
if let Some(internal) = &*internal {
internal.selector.reregister(
let internal = $self.internal.lock().unwrap();
if internal.is_some() {
let selector = internal.as_ref().unwrap().selector.clone();
let token = internal.as_ref().unwrap().token;
let interests = internal.as_ref().unwrap().interests;
drop(internal);
selector.reregister(
$self,
internal.token,
internal.interests,
token,
interests,
)?;
}
}
@ -54,10 +43,22 @@ macro_rules! wouldblock {
impl UdpSocket {
pub fn bind(addr: SocketAddr) -> io::Result<UdpSocket> {
init();
net::UdpSocket::bind(addr).and_then(|io| {
io.set_nonblocking(true).map(|()| UdpSocket {
internal: Arc::new(RwLock::new(None)),
io,
new_socket(addr, SOCK_DGRAM).and_then(|socket| {
let (raw_addr, raw_addr_length) = socket_addr(&addr);
syscall!(
bind(socket, raw_addr, raw_addr_length,),
PartialEq::eq,
SOCKET_ERROR
)
.map_err(|err| {
// Close the socket if we hit an error, ignoring the error
// from closing since we can't pass back two errors.
let _ = unsafe { closesocket(socket) };
err
})
.map(|_| UdpSocket {
internal: Arc::new(Mutex::new(None)),
io: unsafe { net::UdpSocket::from_raw_socket(socket as StdSocket) },
})
})
}
@ -68,21 +69,21 @@ impl UdpSocket {
pub fn try_clone(&self) -> io::Result<UdpSocket> {
self.io.try_clone().map(|io| UdpSocket {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
io,
})
}
pub fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
self.io.send_to(buf, target)
wouldblock!(self, send_to, buf, target)
}
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io.recv_from(buf)
wouldblock!(self, recv_from, buf)
}
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io.peek_from(buf)
wouldblock!(self, peek_from, buf)
}
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
@ -98,7 +99,7 @@ impl UdpSocket {
}
pub fn connect(&self, addr: SocketAddr) -> io::Result<()> {
wouldblock!(self, connect, addr)
self.io.connect(addr)
}
pub fn broadcast(&self) -> io::Result<bool> {
@ -164,7 +165,7 @@ impl UdpSocket {
impl super::SocketState for UdpSocket {
fn get_sock_state(&self) -> Option<Arc<Mutex<SockState>>> {
let internal = self.internal.read().unwrap();
let internal = self.internal.lock().unwrap();
match &*internal {
Some(internal) => match &internal.sock_state {
Some(arc) => Some(arc.clone()),
@ -174,7 +175,7 @@ impl super::SocketState for UdpSocket {
}
}
fn set_sock_state(&self, sock_state: Option<Arc<Mutex<SockState>>>) {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
match &mut *internal {
Some(internal) => {
internal.sock_state = sock_state;
@ -187,7 +188,7 @@ impl super::SocketState for UdpSocket {
impl event::Source for UdpSocket {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
{
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
if internal.is_none() {
*internal = Some(InternalState::new(
poll::selector(registry).clone_inner(),
@ -200,7 +201,7 @@ impl event::Source for UdpSocket {
match result {
Ok(_) => {}
Err(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
}
@ -216,7 +217,7 @@ impl event::Source for UdpSocket {
let result = poll::selector(registry).reregister(self, token, interests);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
internal.as_mut().unwrap().token = token;
internal.as_mut().unwrap().interests = interests;
}
@ -229,7 +230,7 @@ impl event::Source for UdpSocket {
let result = poll::selector(registry).deregister(self);
match result {
Ok(_) => {
let mut internal = self.internal.write().unwrap();
let mut internal = self.internal.lock().unwrap();
*internal = None;
}
Err(_) => {}
@ -238,19 +239,6 @@ impl event::Source for UdpSocket {
}
}
impl Drop for UdpSocket {
fn drop(&mut self) {
let internal = self.internal.read().unwrap();
if let Some(internal) = internal.as_ref() {
if let Some(sock_state) = internal.sock_state.as_ref() {
internal
.selector
.mark_delete_socket(&mut sock_state.lock().unwrap());
}
}
}
}
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.io, f)
@ -260,7 +248,7 @@ impl fmt::Debug for UdpSocket {
impl FromRawSocket for UdpSocket {
unsafe fn from_raw_socket(rawsocket: RawSocket) -> UdpSocket {
UdpSocket {
internal: Arc::new(RwLock::new(None)),
internal: Arc::new(Mutex::new(None)),
io: net::UdpSocket::from_raw_socket(rawsocket),
}
}

View File

@ -1,7 +1,7 @@
use std::io;
use crate::{poll, sys, Registry, Token};
use std::io;
/// Waker allows cross-thread waking of [`Poll`].
///
/// When created it will cause events with [`readable`] readiness and the

View File

@ -1,7 +1,8 @@
use mio::net::{TcpListener, TcpStream};
use mio::*;
use std::net;
use std::sync::{Arc, Barrier};
use std::thread;
use std::thread::{self, sleep};
use std::time::Duration;
mod util;
@ -42,6 +43,41 @@ fn add_then_drop() {
.unwrap();
}
#[test]
fn zero_duration_polls_events() {
init();
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(16);
let listener = net::TcpListener::bind(any_local_address()).unwrap();
let addr = listener.local_addr().unwrap();
let streams: Vec<TcpStream> = (0..3)
.map(|n| {
let stream = TcpStream::connect(addr).unwrap();
poll.registry()
.register(&stream, Token(n), Interests::WRITABLE)
.unwrap();
stream
})
.collect();
// Ensure the TcpStreams have some time to connection and for the events to
// show up.
sleep(Duration::from_millis(10));
// Even when passing a zero duration timeout we still want do the system
// call.
poll.poll(&mut events, Some(Duration::from_nanos(0)))
.unwrap();
assert!(!events.is_empty());
// Both need to live until here.
drop(streams);
drop(listener);
}
#[test]
fn test_poll_closes_fd() {
init();
@ -114,7 +150,6 @@ fn test_drop_cancels_interest_and_shuts_down() {
}
#[test]
#[cfg_attr(windows, ignore = "can't concurrently poll and register on Windows")]
fn test_registry_behind_arc() {
// `Registry` should work behind an `Arc`, being `Sync` and `Send`.
init();
@ -146,7 +181,7 @@ fn test_registry_behind_arc() {
barrier3.wait();
});
poll.poll(&mut events, Some(Duration::from_millis(100)))
poll.poll(&mut events, Some(Duration::from_millis(1000)))
.unwrap();
assert!(events.iter().count() >= 1);

13
tests/size.rs Normal file
View File

@ -0,0 +1,13 @@
#[test]
#[cfg(unix)]
#[cfg(not(debug_assertions))]
fn assert_size() {
use mio::net::*;
use std::mem::size_of;
// Without debug assertions enabled `TcpListener`, `TcpStream` and `UdpSocket` should have the
// same size as the system specific socket, i.e. just a file descriptor on Unix platforms.
assert_eq!(size_of::<TcpListener>(), size_of::<std::net::TcpListener>());
assert_eq!(size_of::<TcpStream>(), size_of::<std::net::TcpStream>());
assert_eq!(size_of::<UdpSocket>(), size_of::<std::net::UdpSocket>());
}