十一、流水线模式(Pipeline)
1、核心思想
将一个任务处理分解为若干个处理阶段,其中每个处理阶段的输出作为下一个处理阶段的输入,并且各个处理阶段都有相应的工作者线程去执行相应的计算。
2、评价:
充分利用CPU,提高其计算效率。
允许子任务间存在依赖关系的条件下实现并行计算。
非常便于采用单线程模型实现对子任务的处理。
有错误处理 PipeContext
3、适用场景
a、适合于处理规模较大的任务,否则可能得不偿失。各个处理阶段所使用的工作者线程或者线程池、输入输出对象的创建和转移都有自身的时间和空间消耗。
public interface Pipe {
public void setNextPipe(Pipe,?> nextPipe);
public void process(IN input) throws InterruptedException;
public void init(PipeContext pipeCtx);
public void shutdown(long timeout, TimeUnit unit);
}
public interface PipeLine extends Pipe {
void addPipe(Pipe,?> pipe);
}
public abstract class AbsractPipe implements Pipe {
protected volatile Pipe, ?> nextPipe = null;
protected volatile PipeContext PipeCtx = null;
@Override
public void setNextPipe(Pipe, ?> nextPipe) {
this.nextPipe = nextPipe;
}
@SuppressWarnings("unchecked")
@Override
public void process(IN input) throws InterruptedException {
try {
OUT out = doProcess(input);
if(null != nextPipe){
if(null != out){
((Pipe) nextPipe).process(out);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (PipeException e) {
PipeCtx.handleError(e);
}
}
@Override
public void init(PipeContext pipeCtx) {
this.PipeCtx = pipeCtx;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
}
public abstract OUT doProcess(IN input) throws PipeException;
}
public abstract class AbstractParallePipe extends AbsractPipe {
private final ExecutorService executorService;
public AbstractParallePipe(BlockingQueue queue, ExecutorService executorService) {
super();
this.executorService = executorService;
}
protected abstract List> buildTasks(IN input) throws Exception;
protected abstract OUT combineResults(List> subTaskResults) throws Exception;
protected List> invokeParallel(List> tasks) throws Exception{
return executorService.invokeAll(tasks);
}
@Override
public OUT doProcess(IN input) throws PipeException {
OUT out = null;
try {
out = combineResults(invokeParallel(buildTasks(input)));
} catch (Exception e) {
throw new PipeException(this, input, "Task failed", e);
}
return out;
}
}
public class SimplePipeline extends AbsractPipe implements PipeLine {
private final Queue> pipes = new LinkedList>();
private final ExecutorService helperService;
public SimplePipeline() {
this(Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "SimplePpeLine-Helper");
t.setDaemon(true);
return t;
}
}));
}
public SimplePipeline(final ExecutorService helperService) {
super();
this.helperService = helperService;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
Pipe,?> pipe;
while(null != (pipe = pipes.poll())){
pipe.shutdown(timeout, unit);
}
helperService.shutdown();
}
@Override
public void addPipe(Pipe, ?> pipe) {
pipes.add(pipe);
}
@Override
public OUT doProcess(IN input) throws PipeException {
return null;
}
@Override
public void process(IN input) throws InterruptedException {
@SuppressWarnings("unchecked")
Pipe firstPipe = (Pipe) pipes.peek();
firstPipe.process(input);
}
@Override
public void init(PipeContext pipeCtx) {
LinkedList> pipesList = (LinkedList>) pipes;
Pipe, ?> prevPipe = this;
for(Pipe, ?> pipe: pipesList){
prevPipe.setNextPipe(pipe);
prevPipe = pipe;
}
Runnable task = new Runnable() {
@Override
public void run() {
for(Pipe, ?> pipe: pipes){
pipe.init(pipeCtx);
}
}
};
helperService.submit(task);
}
public void addAsWorkerThreadBasedPipe(Pipe delegate, int workCount){
addPipe(new WorkThreadPipeDecorator(delegate, workCount));
}
public void addAsThreadBasedPipe(Pipe delegate, ExecutorService executorService){
addPipe(new ThreadPoolPipeDecorator(delegate, executorService));
}
public PipeContext newDefaultPipeContext(){
return new PipeContext() {
@Override
public void handleError(PipeException exp) {
helperService.submit(new Runnable() {
@Override
public void run() {
exp.printStackTrace();
}
});
}
};
}
}
public class ThreadPoolPipeDecorator implements Pipe {
private final Pipe delegate;
private final TerminationToken terminationToken;
private final ExecutorService executorService;
private final CountDownLatch stageProcessDoneLatch = new CountDownLatch(1);
public ThreadPoolPipeDecorator(Pipe delegate, ExecutorService executorService) {
super();
this.delegate = delegate;
this.executorService = executorService;
terminationToken = TerminationToken.newInstance(executorService);
}
@Override
public void setNextPipe(Pipe, ?> nextPipe) {
delegate.setNextPipe(nextPipe);
}
@Override
public void process(IN input) throws InterruptedException {
Runnable task = new Runnable() {
@Override
public void run() {
int remainingReservations = -1;
try {
delegate.process(input);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
remainingReservations = terminationToken.reservations.decrementAndGet();
}
if(terminationToken.isToShutDown() && 0 == remainingReservations){
stageProcessDoneLatch.countDown();
}
}
};
executorService.submit(task);
terminationToken.reservations.incrementAndGet();
}
@Override
public void init(PipeContext pipeCtx) {
delegate.init(pipeCtx);
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
terminationToken.setIsToShutdown();
if(terminationToken.reservations.get() > 0){
try {
if(stageProcessDoneLatch.getCount() > 0){
stageProcessDoneLatch.await(timeout, unit);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
delegate.shutdown(timeout, unit);
}
private static class TerminationToken extends com.threadDesign.twoPhase.TerminationToken{
private final static ConcurrentHashMap
INSTANCE_MAP = new ConcurrentHashMap();
private TerminationToken(){
}
void setIsToShutdown(){
this.toShutDown = true;
}
static TerminationToken newInstance(ExecutorService executorService){
TerminationToken token = INSTANCE_MAP.get(executorService);
if(null == token){
token = new TerminationToken();
TerminationToken existingToken = INSTANCE_MAP.putIfAbsent(executorService, token);
if(null != existingToken){
token = existingToken;
}
}
return token;
}
}
}
public class WorkThreadPipeDecorator implements Pipe {
protected final BlockingQueue workQueue;
protected final Set workerThreads = new HashSet();
protected final TerminationToken terminationToken = new TerminationToken();
private final Pipe delegate;
public WorkThreadPipeDecorator(Pipe delegate, int workerCount){
this(new SynchronousQueue(), delegate, workerCount);
}
public WorkThreadPipeDecorator(BlockingQueue workQueue, Pipe delegate, int workerCount) {
if(workerCount <= 0){
throw new IllegalArgumentException("workerCount should be positive!");
}
this.workQueue = workQueue;
this.delegate = delegate;
for(int i=0; inew AbstractTerminatableThread() {
@Override
protected void doRun() throws Exception {
try {
dispatch();
}finally {
terminationToken.reservations.decrementAndGet();
}
}
});
}
}
private void dispatch() throws InterruptedException {
IN input = workQueue.take();
delegate.process(input);
}
@Override
public void setNextPipe(Pipe, ?> nextPipe) {
delegate.setNextPipe(nextPipe);
}
@Override
public void process(IN input) throws InterruptedException {
workQueue.put(input);
terminationToken.reservations.incrementAndGet();
}
@Override
public void init(PipeContext pipeCtx) {
delegate.init(pipeCtx);
for(AbstractTerminatableThread thread : workerThreads){
thread.start();
}
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
for(AbstractTerminatableThread thread : workerThreads){
thread.terminate();
try {
thread.join(TimeUnit.MILLISECONDS.convert(timeout, unit));
} catch (InterruptedException e) {
}
}
delegate.shutdown(timeout, unit);
}
}
public class PipeException extends Exception {
private static final long serialVersionUID = 8647786507719222800L;
public final Pipe, ?> sourcePipe;
public final Object input;
public PipeException(Pipe, ?> sourcePipe, Object input, String message) {
super(message);
this.sourcePipe = sourcePipe;
this.input = input;
}
public PipeException(Pipe, ?> sourcePipe, Object input, String message, Throwable cause) {
super(message, cause);
this.sourcePipe = sourcePipe;
this.input = input;
}
}
public interface PipeContext {
public void handleError(PipeException exp);
}
public class ThreadPoolBasedPipeExample {
public static void main(String[] args) {
final ThreadPoolExecutor threadPoolExecutor;
threadPoolExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors()*2, 60, TimeUnit.MINUTES,
new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
final SimplePipeline pipeLine = new SimplePipeline();
Pipe pipe = new AbsractPipe() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe1, " + Thread.currentThread().getName() + "]";
System.out.println(result);
return result;
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipe = new AbsractPipe() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe2, " + Thread.currentThread().getName() + "]";
System.out.println(result);
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipe = new AbsractPipe() {
@Override
public String doProcess(String input) throws PipeException {
String result = input + "->[pipe3, " + Thread.currentThread().getName() + "]";
System.out.println(result);
try {
Thread.sleep(new Random().nextInt(200));
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
@Override
public void shutdown(long timeout, TimeUnit unit) {
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
pipeLine.addAsThreadBasedPipe(pipe, threadPoolExecutor);
pipeLine.init(pipeLine.newDefaultPipeContext());
int N = 10;
try {
for(int i=0; i"Task-" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
pipeLine.shutdown(10, TimeUnit.SECONDS);
}
}