Cleanup of sys::unix (#1005)

* Remove pipe and set_nonblock unix export (not used).

* Move sys::unix::pipe to sys::Waker

Only place where it is used. And since the Waker based is only used on certain platforms directly use pipe2. All the platforms that use pipe based Waker have the pipe2 system call.

* Always use epoll_create1 on epoll platforms

epoll_create1 was introduces in Linux kernel version 2.6.27.

* Remove dlsym module (not used).

* Silence unused warning for set_cloexec (not used).

* Remove sys::unix::io::Io (not used).
This commit is contained in:
Thomas de Zeeuw 2019-06-26 13:52:30 +00:00 committed by Carl Lerche
parent 5ffb91a094
commit 479ae7c5e2
7 changed files with 55 additions and 219 deletions

View File

@ -133,7 +133,7 @@ pub use waker::Waker;
#[cfg(unix)]
pub mod unix {
//! Unix only extensions
//! Unix only extensions.
pub use crate::sys::EventedFd;
}

View File

@ -1,7 +1,7 @@
//! Module with system specific types.
//!
//! `SysEvent`: must be a type alias for the system specific event, e.g.
//! `kevent` or `epol_event`.
//! `kevent` or `epoll_event`.
//! `Event`: **must be** a `transparent` wrapper around `SysEvent`, i.e. the
//! type must have `#[repr(transparent)]` with only `SysEvent` as
//! field. This is safety requirement, see `Event::from_sys_event_ref`.
@ -10,8 +10,7 @@
#[cfg(unix)]
pub use self::unix::{
pipe, set_nonblock, Event, EventedFd, Events, Io, Selector, SysEvent, TcpListener, TcpStream,
UdpSocket, Waker,
Event, EventedFd, Events, Selector, SysEvent, TcpListener, TcpStream, UdpSocket, Waker,
};
#[cfg(unix)]

View File

@ -1,50 +0,0 @@
use std::marker;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use libc;
macro_rules! dlsym {
(fn $name:ident($($t:ty),*) -> $ret:ty) => (
#[allow(bad_style)]
static $name: crate::sys::unix::dlsym::DlSym<unsafe extern fn($($t),*) -> $ret> =
crate::sys::unix::dlsym::DlSym {
name: concat!(stringify!($name), "\0"),
addr: ::std::sync::atomic::AtomicUsize::new(0),
_marker: ::std::marker::PhantomData,
};
)
}
#[allow(dead_code)]
pub struct DlSym<F> {
pub name: &'static str,
pub addr: AtomicUsize,
pub _marker: marker::PhantomData<F>,
}
impl<F> DlSym<F> {
#[allow(dead_code)]
pub fn get(&self) -> Option<&F> {
assert_eq!(mem::size_of::<F>(), mem::size_of::<usize>());
unsafe {
if self.addr.load(Ordering::SeqCst) == 0 {
self.addr.store(fetch(self.name), Ordering::SeqCst);
}
if self.addr.load(Ordering::SeqCst) == 1 {
None
} else {
mem::transmute::<&AtomicUsize, Option<&F>>(&self.addr)
}
}
}
}
#[allow(dead_code)]
unsafe fn fetch(name: &str) -> usize {
assert_eq!(name.as_bytes()[name.len() - 1], 0);
match libc::dlsym(libc::RTLD_DEFAULT, name.as_ptr() as *const _) as usize {
0 => 1,
n => n,
}
}

View File

@ -1,8 +1,6 @@
use crate::sys::unix::cvt;
use crate::sys::unix::io::set_cloexec;
use crate::{Interests, Token};
use libc::{self, c_int};
use libc::{EPOLLET, EPOLLIN, EPOLLOUT};
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
@ -28,31 +26,23 @@ pub struct Selector {
impl Selector {
pub fn new() -> io::Result<Selector> {
let epfd = unsafe {
// Emulate `epoll_create` by using `epoll_create1` if it's available
// and otherwise falling back to `epoll_create` followed by a call to
// set the CLOEXEC flag.
dlsym!(fn epoll_create1(c_int) -> c_int);
match epoll_create1.get() {
Some(epoll_create1_fn) => cvt(epoll_create1_fn(libc::EPOLL_CLOEXEC))?,
None => {
let fd = cvt(libc::epoll_create(1024))?;
drop(set_cloexec(fd));
fd
}
}
};
// offset by 1 to avoid choosing 0 as the id of a selector
#[cfg(debug_assertions)]
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
Ok(Selector {
// According to libuv `EPOLL_CLOEXEC` is not defined on Android API <
// 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on all platforms,
// so we use that instead.
let epfd = unsafe { libc::epoll_create1(libc::O_CLOEXEC) };
if epfd == -1 {
Err(io::Error::last_os_error())
} else {
// offset by 1 to avoid choosing 0 as the id of a selector
#[cfg(debug_assertions)]
id,
epfd,
})
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
Ok(Selector {
#[cfg(debug_assertions)]
id,
epfd,
})
}
}
#[cfg(debug_assertions)]
@ -264,3 +254,9 @@ pub fn millis(duration: Duration) -> u64 {
.saturating_mul(MILLIS_PER_SEC)
.saturating_add(millis as u64)
}
#[test]
fn assert_close_on_exec_flag() {
// This assertion need to be true for Selector::new.
assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
}

View File

@ -1,12 +1,6 @@
use crate::event::Evented;
use crate::sys::unix::cvt;
use crate::unix::EventedFd;
use crate::{Interests, Registry, Token};
use std::io;
use libc;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::sys::unix::cvt;
pub fn set_nonblock(fd: libc::c_int) -> io::Result<()> {
unsafe {
@ -15,101 +9,18 @@ pub fn set_nonblock(fd: libc::c_int) -> io::Result<()> {
}
}
#[cfg(any(
target_os = "bitrig",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
pub fn set_cloexec(fd: libc::c_int) -> io::Result<()> {
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFD);
cvt(libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC)).map(|_| ())
}
}
/*
*
* ===== Basic IO type =====
*
*/
/// Manages a FD
#[derive(Debug)]
pub struct Io {
fd: File,
}
impl Io {
/// Try to clone the FD
pub fn try_clone(&self) -> io::Result<Io> {
Ok(Io {
fd: self.fd.try_clone()?,
})
}
}
impl FromRawFd for Io {
unsafe fn from_raw_fd(fd: RawFd) -> Io {
Io {
fd: File::from_raw_fd(fd),
}
}
}
impl IntoRawFd for Io {
fn into_raw_fd(self) -> RawFd {
self.fd.into_raw_fd()
}
}
impl AsRawFd for Io {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
impl Evented for Io {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).register(registry, token, interests)
}
fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).reregister(registry, token, interests)
}
fn deregister(&self, registry: &Registry) -> io::Result<()> {
EventedFd(&self.as_raw_fd()).deregister(registry)
}
}
impl Read for Io {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
(&self.fd).read(dst)
}
}
impl<'a> Read for &'a Io {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
(&self.fd).read(dst)
}
}
impl Write for Io {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
(&self.fd).write(src)
}
fn flush(&mut self) -> io::Result<()> {
(&self.fd).flush()
}
}
impl<'a> Write for &'a Io {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
(&self.fd).write(src)
}
fn flush(&mut self) -> io::Result<()> {
(&self.fd).flush()
}
}

View File

@ -1,8 +1,3 @@
use libc::{self, c_int};
#[macro_use]
pub mod dlsym;
#[cfg(any(target_os = "linux", target_os = "android", target_os = "solaris"))]
mod epoll;
@ -39,39 +34,12 @@ mod uio;
mod waker;
pub use self::eventedfd::EventedFd;
pub use self::io::{set_nonblock, Io};
pub use self::tcp::{TcpListener, TcpStream};
pub use self::udp::UdpSocket;
pub use self::waker::Waker;
pub use iovec::IoVec;
use std::os::unix::io::FromRawFd;
#[allow(dead_code)]
pub fn pipe() -> std::io::Result<(Io, Io)> {
// Use pipe2 for atomically setting O_CLOEXEC if we can, but otherwise
// just fall back to using `pipe`.
dlsym!(fn pipe2(*mut c_int, c_int) -> c_int);
let mut pipes = [0; 2];
let flags = libc::O_NONBLOCK | libc::O_CLOEXEC;
unsafe {
match pipe2.get() {
Some(pipe2_fn) => {
cvt(pipe2_fn(pipes.as_mut_ptr(), flags))?;
}
None => {
cvt(libc::pipe(pipes.as_mut_ptr()))?;
libc::fcntl(pipes[0], libc::F_SETFL, flags);
libc::fcntl(pipes[1], libc::F_SETFL, flags);
}
}
}
unsafe { Ok((Io::from_raw_fd(pipes[0]), Io::from_raw_fd(pipes[1]))) }
}
trait IsMinusOne {
fn is_minus_one(&self) -> bool;
}

View File

@ -108,10 +108,11 @@ pub use self::kqueue::Waker;
target_os = "solaris"
))]
mod pipe {
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::AsRawFd;
use std::os::unix::io::FromRawFd;
use crate::sys::{pipe, Io, Selector};
use crate::sys::unix::Selector;
use crate::{Interests, Token};
/// Waker backed by a unix pipe.
@ -120,15 +121,26 @@ mod pipe {
/// if writing to it (waking) fails.
#[derive(Debug)]
pub struct Waker {
sender: Io,
receiver: Io,
sender: File,
receiver: File,
}
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
let (sender, receiver) = pipe()?;
selector.register(receiver.as_raw_fd(), token, Interests::READABLE)?;
Ok(Waker { sender, receiver })
let mut fds = [-1; 2];
let flags = libc::O_NONBLOCK | libc::O_CLOEXEC;
if unsafe { libc::pipe2(fds.as_mut_ptr(), flags) } == -1 {
Err(io::Error::last_os_error())
} else {
selector.register(fds[0], token, Interests::READABLE)?;
unsafe {
Ok(Waker {
sender: File::from_raw_fd(fds[1]),
receiver: File::from_raw_fd(fds[0]),
})
}
}
}
pub fn wake(&self) -> io::Result<()> {