diff --git a/src/bin/main.rs b/src/bin/main.rs index cca9b38..4df7945 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -10,24 +10,31 @@ use std::thread; use std::time::Duration; fn main() { - let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); - let counter = Arc::new(Mutex::new(0usize)); + let addr = "127.0.0.1:7878"; + let listener = TcpListener::bind(&addr).unwrap(); + println!("Listening on http://{}", &addr); + let counter = Arc::new(Mutex::new(0usize)); let tasks = ThreadPool::new(100); - for stream in listener.incoming() { - let stream = stream.unwrap(); - let counter = Arc::clone(&counter); + for stream in listener.incoming()/*.take(2)*/ { + println!("accepted new connection"); - tasks.run(move || { - { - // in a subscope to release borrow automatically - let mut count = counter.lock().unwrap(); - *count += 1; - } - handle_connection(counter, stream); - }); + if let Ok(stream) = stream { + let counter = Arc::clone(&counter); + + tasks.run(move || { + { + // in a subscope to release borrow automatically + let mut count = counter.lock().unwrap(); + *count += 1; + } + handle_connection(counter, stream); + }); + } } + + println!("Shutting down..."); } fn handle_connection(counter: Arc>, mut stream: TcpStream) { diff --git a/src/lib.rs b/src/lib.rs index afa3087..7c973c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,29 +16,65 @@ type Job = Box; struct Worker { id: usize, - thread: thread::JoinHandle<()>, + thread: Option>, +} + +enum Message { + NewJob(Job), + Terminate, } impl Worker { - pub fn new(id: usize, receiver: Arc>>) -> Worker { + pub fn new(id: usize, receiver: Arc>>) -> Worker { Worker { id, - thread: thread::spawn(move || { + thread: Some(thread::spawn(move || { loop { - let job = receiver.lock().expect("unrecoverable poisened panicked thread state").recv().unwrap(); - - println!("such busy, so worker #{}!", id); - - job.call_box(); + if let Ok(job) = receiver.lock().expect("unrecoverable poisened panicked thread state").recv() { + match job { + Message::NewJob(job) => { + println!("such busy, so worker #{}!", id); + job.call_box(); + } + Message::Terminate => { + println!("such sad, so dead #{}!", id); + break; + } + } + } else { + // TODO respawn? (unless in terminate mode) + // or why would the recv fail? already terminated? + // thread panic not handled? + break; + } } - }), + })), } } } pub struct ThreadPool { workers: Vec, - sender: mpsc::Sender, + sender: mpsc::Sender, +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + // Each worker will only receive Terminate once + for _ in &mut self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + + // Each worker will join once complete + for worker in &mut self.workers { + println!("Shutting down worker {} ...", worker.id); + + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + println!("Killed {}!", worker.id); + } + } + } } impl ThreadPool { @@ -64,6 +100,6 @@ impl ThreadPool { pub fn run(&self, task: F) { let task = Box::new(task); - self.sender.send(task).unwrap(); + self.sender.send(Message::NewJob(task)).unwrap(); } }