停止基于服务的线程
应用程序通常会创建拥有服务的线程, 比如线程池. 这些服务的存在时间通常要比创建他们的方法存在的时间更长, 如果应用程序优雅的退出了,这些服务的线程也需要结束.因为没有退出线程惯用的优先方法, 他们需要自行结束.
明智的封装实践指出,你不应该操控某个线程一一中断它,改变他的优先级,等等... 除非是这个.线程的拥有者, 线程API没有关于线程所属权正规的概念. 线程通过一个Thread对象表示,和其他对象一样可以被自由的共享. 但是, 认为线程有一个拥有者是有道理的. 这个拥有者就是创建这个线程的类. 所有线程池拥有它的工作线程, 如果需要中断这些线程. 那么应该由线程池来负责
例如: ExecutorService 提供的 shutdown 和 shutdownNow 方法.
对于线程持有的服务, 只要服务的存在时间大于创建线程的方法存在时间,那么就应该提供生命周期方法(lifecycle method)
实例: 日志服务
public class LogService { private final BlockingQueuequeue; private final LoggerThread loggerThread; private final PrintWriter writer; private boolean isShutdown; private int reservations; public LogService(Writer writer) { this.queue = new LinkedBlockingQueue (); this.loggerThread = new LoggerThread(); this.writer = new PrintWriter(writer); } public void start() { loggerThread.start(); } public void stop() { synchronized (this) { isShutdown = true; } loggerThread.interrupt(); } public void log(String msg) throws InterruptedException { synchronized (this) { if (isShutdown) throw new IllegalStateException(/* ... */); ++reservations; } queue.put(msg); } private class LoggerThread extends Thread { public void run() { try { while (true) { try { synchronized (LogService.this) { if (isShutdown && reservations == 0) break; } String msg = queue.take(); synchronized (LogService.this) { --reservations; } writer.println(msg); } catch (InterruptedException e) { /* retry */ } } } finally { writer.close(); } } }}
或者使用 ExecutorService
public class LogService2 { private final ExecutorService executorService = Executors .newSingleThreadExecutor(); private final PrintWriter writer; public LogService2(PrintWriter writer) { super(); this.writer = writer; } public void start() { } public void close() { try { this.executorService.shutdown(); this.executorService.awaitTermination(TIME_OUT, UNIT); } finally { if (writer != null) writer.close(); } } public void log(String str) { this.executorService.execute(new Writer(str)); }}
实例: 致命药丸
public class IndexingService { private static final int CAPACITY = 1000; private static final File POISON = new File(""); private final IndexerThread consumer = new IndexerThread(); private final CrawlerThread producer = new CrawlerThread(); private final BlockingQueuequeue; private final FileFilter fileFilter; private final File root; public IndexingService(File root, final FileFilter fileFilter) { this.root = root; this.queue = new LinkedBlockingQueue (CAPACITY); this.fileFilter = new FileFilter() { public boolean accept(File f) { return f.isDirectory() || fileFilter.accept(f); } }; } private boolean alreadyIndexed(File f) { return false; } class CrawlerThread extends Thread { public void run() { try { crawl(root); } catch (InterruptedException e) { /* fall through */ } finally { while (true) { try { queue.put(POISON); break; } catch (InterruptedException e1) { /* retry */ } } } } private void crawl(File root) throws InterruptedException { File[] entries = root.listFiles(fileFilter); if (entries != null) { for (File entry : entries) { if (entry.isDirectory()) crawl(entry); else if (!alreadyIndexed(entry)) queue.put(entry); } } } } class IndexerThread extends Thread { public void run() { try { while (true) { File file = queue.take(); if (file == POISON) break; else indexFile(file); } } catch (InterruptedException consumed) { } } public void indexFile(File file) { /* ... */ }; } public void start() { producer.start(); consumer.start(); } public void stop() { producer.interrupt(); } public void awaitTermination() throws InterruptedException { consumer.join(); } public static void main(String[] args) { IndexingService is =new IndexingService(new File("e://"), new FileFilter() { public boolean accept(File pathname) { System.out.println(pathname); return true; } }); is.start(); is.stop(); try { is.awaitTermination(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("the end "); }}
致命药丸只有在生产线程与消费线程已知的情况下才能使用. IndexingService中的解决方案也可以被扩展到多生产者, 只要让每一个生产线程都往队列里放入一个药丸, 并且消费线程收到第 N(生产者的数量)个药丸时停止.
实例: 只执行一次的服务
public class CheckForMail { public boolean checkMail(Sethosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail = new AtomicBoolean(false); try { for (final String host : hosts) exec.execute(new Runnable() { public void run() { if (checkMail(host)) hasNewMail.set(true); } }); } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); } private boolean checkMail(String host) { // Check for mail return false; }}