java多线程设计模式 -- 流水线模式(Pipeline)

2019-04-14 20:06发布

十一、流水线模式(Pipeline)
1、核心思想
将一个任务处理分解为若干个处理阶段,其中每个处理阶段的输出作为下一个处理阶段的输入,并且各个处理阶段都有相应的工作者线程去执行相应的计算。
2、评价:
充分利用CPU,提高其计算效率。
允许子任务间存在依赖关系的条件下实现并行计算。
非常便于采用单线程模型实现对子任务的处理。
有错误处理 PipeContext
3、适用场景
a、适合于处理规模较大的任务,否则可能得不偿失。各个处理阶段所使用的工作者线程或者线程池、输入输出对象的创建和转移都有自身的时间和空间消耗。 /** * 对处理阶段的抽象。 * 负责对输入进行处理,并将输出作为下一处理阶段的输入 * @author huzhiqiang * * @param * @param */ public interface Pipe { /** * 设置当前Pipe实例的下个Pipe实例 * @param nextPipe */ public void setNextPipe(Pipe nextPipe); /** * 对输入的元素进行处理,并将处理结果作为下一个Pipe实例的输入 * @param input * @throws InterruptedException */ public void process(IN input) throws InterruptedException; public void init(PipeContext pipeCtx); public void shutdown(long timeout, TimeUnit unit); } /** * 对复合Pipe的抽象。一个Pipeline实例可包含多个Pipe实例 * @author huzhiqiang * * @param * @param */ public interface PipeLine extends Pipe { void addPipe(Pipe pipe); } public abstract class AbsractPipe implements Pipe { protected volatile Pipe nextPipe = null; protected volatile PipeContext PipeCtx = null; @Override public void setNextPipe(Pipe nextPipe) { this.nextPipe = nextPipe; } @SuppressWarnings("unchecked") @Override public void process(IN input) throws InterruptedException { try { OUT out = doProcess(input); if(null != nextPipe){ if(null != out){ ((Pipe) nextPipe).process(out); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (PipeException e) { PipeCtx.handleError(e); } } @Override public void init(PipeContext pipeCtx) { this.PipeCtx = pipeCtx; } @Override public void shutdown(long timeout, TimeUnit unit) { //什么也不做 } /** * 留给子类实现,用于子类实现其任务处理逻辑 */ public abstract OUT doProcess(IN input) throws PipeException; } public abstract class AbstractParallePipe extends AbsractPipe { private final ExecutorService executorService; public AbstractParallePipe(BlockingQueue queue, ExecutorService executorService) { super(); this.executorService = executorService; } /** * 留给子类实现,用于根据指定的输入元素input构造一组子任务 * @param input * @return * @throws Exception */ protected abstract List> buildTasks(IN input) throws Exception; /** * 留给子类实现,对各个子任务的处理结果进行合并,形成相应输入元素的输出结果 * @param subTaskResults * @return * @throws Exception */ protected abstract OUT combineResults(List> subTaskResults) throws Exception; /** * 以并行的方式执行一组子任务 * @param tasks * @return * @throws Exception */ protected List> invokeParallel(List> tasks) throws Exception{ return executorService.invokeAll(tasks); } @Override public OUT doProcess(IN input) throws PipeException { OUT out = null; try { out = combineResults(invokeParallel(buildTasks(input))); } catch (Exception e) { throw new PipeException(this, input, "Task failed", e); } return out; } } public class SimplePipeline extends AbsractPipe implements PipeLine { private final Queue> pipes = new LinkedList>(); private final ExecutorService helperService; public SimplePipeline() { //创建固定线程数为1的线程池,整型的最大数的LinkedBlockingQueue的缓存队列 this(Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, "SimplePpeLine-Helper"); t.setDaemon(true); return t; } })); } public SimplePipeline(final ExecutorService helperService) { super(); this.helperService = helperService; } @Override public void shutdown(long timeout, TimeUnit unit) { Pipe pipe; while(null != (pipe = pipes.poll())){ pipe.shutdown(timeout, unit); } helperService.shutdown(); } @Override public void addPipe(Pipe pipe) { pipes.add(pipe); } @Override public OUT doProcess(IN input) throws PipeException { // TODO Auto-generated method stub return null; } @Override public void process(IN input) throws InterruptedException { @SuppressWarnings("unchecked") Pipe firstPipe = (Pipe) pipes.peek(); firstPipe.process(input); } @Override public void init(PipeContext pipeCtx) { LinkedList> pipesList = (LinkedList>) pipes; Pipe prevPipe = this; //设置处理任务的先后顺序 for(Pipe pipe: pipesList){ prevPipe.setNextPipe(pipe); prevPipe = pipe; } Runnable task = new Runnable() { @Override public void run() { for(Pipe pipe: pipes){ pipe.init(pipeCtx); } } }; helperService.submit(task); } public void addAsWorkerThreadBasedPipe(Pipe delegate, int workCount){ addPipe(new WorkThreadPipeDecorator(delegate, workCount)); } public void addAsThreadBasedPipe(Pipe delegate, ExecutorService executorService){ addPipe(new ThreadPoolPipeDecorator(delegate, executorService)); } public PipeContext newDefaultPipeContext(){ return new PipeContext() { @Override public void handleError(PipeException exp) { helperService.submit(new Runnable() { @Override public void run() { exp.printStackTrace(); } }); } }; } } public class ThreadPoolPipeDecorator implements Pipe { private final Pipe delegate; private final TerminationToken terminationToken; private final ExecutorService executorService; private final CountDownLatch stageProcessDoneLatch = new CountDownLatch(1); public ThreadPoolPipeDecorator(Pipe delegate, ExecutorService executorService) { super(); this.delegate = delegate; this.executorService = executorService; terminationToken = TerminationToken.newInstance(executorService); } @Override public void setNextPipe(Pipe nextPipe) { delegate.setNextPipe(nextPipe); } @Override public void process(IN input) throws InterruptedException { Runnable task = new Runnable() { @Override public void run() { int remainingReservations = -1; try { delegate.process(input); } catch (InterruptedException e) { e.printStackTrace(); }finally { remainingReservations = terminationToken.reservations.decrementAndGet(); } if(terminationToken.isToShutDown() && 0 == remainingReservations){ //最后一个任务执行结束 stageProcessDoneLatch.countDown(); } } }; executorService.submit(task); terminationToken.reservations.incrementAndGet(); } @Override public void init(PipeContext pipeCtx) { delegate.init(pipeCtx); } @Override public void shutdown(long timeout, TimeUnit unit) { terminationToken.setIsToShutdown(); if(terminationToken.reservations.get() > 0){ try { if(stageProcessDoneLatch.getCount() > 0){ //保证线程池中的所有任务都已经执行结束才delegate.shutdown stageProcessDoneLatch.await(timeout, unit); } } catch (InterruptedException e) { e.printStackTrace(); } } delegate.shutdown(timeout, unit); } private static class TerminationToken extends com.threadDesign.twoPhase.TerminationToken{ private final static ConcurrentHashMap INSTANCE_MAP = new ConcurrentHashMap(); private TerminationToken(){ } void setIsToShutdown(){ this.toShutDown = true; } static TerminationToken newInstance(ExecutorService executorService){ TerminationToken token = INSTANCE_MAP.get(executorService); if(null == token){ token = new TerminationToken(); TerminationToken existingToken = INSTANCE_MAP.putIfAbsent(executorService, token); if(null != existingToken){ token = existingToken; } } return token; } } } /** * 基于工作者线程的Pipe实现类 * 提交到该Pipe的任务由指定个数的工作者线程共同处理 * @author huzhiqiang * * @param * @param */ public class WorkThreadPipeDecorator implements Pipe { protected final BlockingQueue workQueue; protected final Set workerThreads = new HashSet(); protected final TerminationToken terminationToken = new TerminationToken(); private final Pipe delegate; public WorkThreadPipeDecorator(Pipe delegate, int workerCount){ this(new SynchronousQueue(), delegate, workerCount); } public WorkThreadPipeDecorator(BlockingQueue workQueue, Pipe delegate, int workerCount) { if(workerCount <= 0){ throw new IllegalArgumentException("workerCount should be positive!"); } this.workQueue = workQueue; this.delegate = delegate; for(int i=0; inew AbstractTerminatableThread() { @Override protected void doRun() throws Exception { try { dispatch(); }finally { terminationToken.reservations.decrementAndGet(); } } }); } } private void dispatch() throws InterruptedException { IN input = workQueue.take(); delegate.process(input); } @Override public void setNextPipe(Pipe nextPipe) { delegate.setNextPipe(nextPipe); } @Override public void process(IN input) throws InterruptedException { workQueue.put(input); terminationToken.reservations.incrementAndGet(); } @Override public void init(PipeContext pipeCtx) { delegate.init(pipeCtx); for(AbstractTerminatableThread thread : workerThreads){ thread.start(); } } @Override public void shutdown(long timeout, TimeUnit unit) { for(AbstractTerminatableThread thread : workerThreads){ thread.terminate(); try { thread.join(TimeUnit.MILLISECONDS.convert(timeout, unit)); } catch (InterruptedException e) { } } delegate.shutdown(timeout, unit); } } public class PipeException extends Exception { private static final long serialVersionUID = 8647786507719222800L; /** * 抛出异常的Pipe实例 */ public final Pipe sourcePipe; public final Object input; public PipeException(Pipe sourcePipe, Object input, String message) { super(message); this.sourcePipe = sourcePipe; this.input = input; } public PipeException(Pipe sourcePipe, Object input, String message, Throwable cause) { super(message, cause); this.sourcePipe = sourcePipe; this.input = input; } } /** * 对各个处理阶段的计算环境进行抽象,主要用于异常处理 * @author huzhiqiang */ public interface PipeContext { public void handleError(PipeException exp); } /** * 测试代码 * @author huzhiqiang * */ public class ThreadPoolBasedPipeExample { public static void main(String[] args) { final ThreadPoolExecutor threadPoolExecutor; threadPoolExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors()*2, 60, TimeUnit.MINUTES, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); final SimplePipeline pipeLine = new SimplePipeline(); Pipe pipe = new AbsractPipe() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe1, " + Thread.currentThread().getName() + "]"; System.out.println(result); return result; } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipe = new AbsractPipe() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe2, " + Thread.currentThread().getName() + "]"; System.out.println(result); try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } return result; } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipe = new AbsractPipe() { @Override public String doProcess(String input) throws PipeException { String result = input + "->[pipe3, " + Thread.currentThread().getName() + "]"; System.out.println(result); try { Thread.sleep(new Random().nextInt(200)); } catch (InterruptedException e) { e.printStackTrace(); } return result; } @Override public void shutdown(long timeout, TimeUnit unit) { threadPoolExecutor.shutdown(); try { threadPoolExecutor.awaitTermination(timeout, unit); } catch (InterruptedException e) { e.printStackTrace(); } } }; pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor); pipeLine.init(pipeLine.newDefaultPipeContext()); int N = 10; try { for(int i=0; i"Task-" + i); } } catch (InterruptedException e) { e.printStackTrace(); } pipeLine.shutdown(10, TimeUnit.SECONDS); } }