[计算机]实战javaconcurrent

申明敬告: 本站不保证该用户上传的文档完整性,不预览、不比对内容而直接下载产生的反悔问题本站不予受理。

文档介绍

[计算机]实战javaconcurrent

实战javaConcurrent时间:2009-09-2408:09:55来源:网络 作者:未知 点击:1686次编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神编写多线程的程序一直都是一件比较麻烦的事情,要考虑很多事情,处理不好还会出很多意想不到的麻烦。加上现在很多开发者接触到的项目都是打着企业级旗号的B/S项目,大多数人都很少涉及多线程,这又为本文的主角增加了一份神秘感。讲到Java多线程,大多数人脑海中跳出来的是Thread、Runnable、synchronized……这些是最基本的东西,虽然已经足够强大,但想要用好还真不容易。从JDK1.5开始,增加了java.util.concurrent包,它的引入大大简化了多线程程序的开发(要感谢一下大牛DougLee)。java.util.concurrent包分成了三个部分,分别是java.util.concurrent、java.util.concurrent.atomic和java.util.concurrent.lock。内容涵盖了并发集合类、线程池机制、同步互斥机制、线程安全的变量更新工具类、锁等等常用工具。为了便于理解,本文使用一个例子来做说明,交代一下它的场景:假设要对一套10个节点组成的环境进行检查,这个环境有两个入口点,通过节点间的依赖关系可以遍历到整个环境。依赖关系可以构成一张有向图,可能存在环。为了提高检查的效率,考虑使用多线程。1、Executors通过这个类能够获得多种线程池的实例,例如可以调用newSingleThreadExecutor()获得单线程的ExecutorService,调用newFixedThreadPool()获得固定大小线程池的ExecutorService。拿到ExecutorService可以做的事情就比较多了,最简单的是用它来执行Runnable对象,也可以执行一些实现了Callable的对象。用Thread的start()方法没有返回值,如果该线程执行的方法有返回值那用ExecutorService就再好不过了,可以选择submit()、invokeAll()或者invokeAny(),根据具体情况选择合适的方法即可。Java代码  package service;      import java.util.ArrayList;   import java.util.List;   import java.util.concurrent.ExecutionException;   import java.util.concurrent.ExecutorService;   import java.util.concurrent.Executors;   import java.util.concurrent.Future;   import \njava.util.concurrent.TimeUnit;      /**    *线程池服务类    *     *@authorDigitalSonic    */    public  class ThreadPoolService{      /**       *默认线程池大小       */       public  static  final  int  DEFAULT_POOL_SIZE   = 5;         /**       *默认一个任务的超时时间,单位为毫秒       */       public  static  final  long DEFAULT_TASK_TIMEOUT= 1000;         private  int              poolSize            =DEFAULT_POOL_SIZE;      private ExecutorService executorService;         /**       *根据给定大小创建线程池       */       public ThreadPoolService(int poolSize){          setPoolSize(poolSize);      }         /**       *使用线程池中的线程来执行任务       */       public  void execute(Runnabletask){          executorService.execute(task);      }         /**       *在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间       *        *@see#invokeAll(List,long)       */       public ListinvokeAll(Listtasks){          return invokeAll(tasks,DEFAULT_TASK_TIMEOUT*\ntasks.size());      }         /**       *在线程池中执行所有给定的任务并取回运行结果       *        *@paramtimeout以毫秒为单位的超时时间,小于0表示不设定超时       *@seejava.util.concurrent.ExecutorService#invokeAll(java.util.Collection)       */       public ListinvokeAll(Listtasks, long timeout){          Listnodes=new ArrayList(tasks.size());          try {              List>futures=null;              if (timeout< 0){                  futures=executorService.invokeAll(tasks);              }else {                  futures=executorService.invokeAll(tasks,timeout,TimeUnit.MILLISECONDS);              }              for (Futurefuture:futures){                  try {                      nodes.add(future.get());                  }catch (ExecutionExceptione){                      e.printStackTrace();                  }              }          }catch (InterruptedExceptione){              e.printStackTrace();          }          return nodes;      }         /**       *关闭当前ExecutorService       *        *@paramtimeout以毫秒为单位的超时时间       */       public  void destoryExecutorService(long timeout)\n{          if (executorService!= null &&!executorService.isShutdown()){              try {                  executorService.awaitTermination(timeout,TimeUnit.MILLISECONDS);              }catch (InterruptedExceptione){                  e.printStackTrace();              }              executorService.shutdown();          }      }         /**       *关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService       */       public  void createExecutorService(){          destoryExecutorService(1000);          executorService=Executors.newFixedThreadPool(poolSize);      }         /**       *调整线程池大小       *@see#createExecutorService()       */       public  void setPoolSize(int poolSize){          this.poolSize=poolSize;          createExecutorService();      }   }   packageservice;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;/** *线程池服务类 *  *@author\nDigitalSonic */publicclassThreadPoolService{   /**    *默认线程池大小    */   publicstaticfinalint DEFAULT_POOL_SIZE   =5;   /**    *默认一个任务的超时时间,单位为毫秒    */   publicstaticfinallongDEFAULT_TASK_TIMEOUT=1000;   privateint             poolSize            =DEFAULT_POOL_SIZE;   privateExecutorService executorService;   /**    *根据给定大小创建线程池    */   publicThreadPoolService(intpoolSize){       setPoolSize(poolSize);   }   /**    *使用线程池中的线程来执行任务    */   publicvoidexecute(Runnabletask){       executorService.execute(task);   }   /**    *在线程池中执行所有给定的任务并取回运行结果,使用默认超时时间    *     *@see#invokeAll(List,long)    */   publicListinvokeAll(Listtasks){       returninvokeAll(tasks,DEFAULT_TASK_TIMEOUT*tasks.size());   }   /**    *在线程池中执行所有给定的任务并取回运行结果    *     *@paramtimeout以毫秒为单位的超时时间,小于0表示不设定超时    *@seejava.util.concurrent.ExecutorService#invokeAll(java.util.Collection)    */   publicListinvokeAll(Listtasks,longtimeout)\n{       Listnodes=newArrayList(tasks.size());       try{           List>futures=null;           if(timeout<0){               futures=executorService.invokeAll(tasks);           }else{               futures=executorService.invokeAll(tasks,timeout,TimeUnit.MILLISECONDS);           }           for(Futurefuture:futures){               try{                   nodes.add(future.get());               }catch(ExecutionExceptione){                   e.printStackTrace();               }           }       }catch(InterruptedExceptione){           e.printStackTrace();       }       returnnodes;   }/**    *关闭当前ExecutorService    *     *@paramtimeout以毫秒为单位的超时时间    */   publicvoiddestoryExecutorService(longtimeout){       if(executorService!=null&&!executorService.isShutdown()){           try{               executorService.awaitTermination(timeout,TimeUnit.MILLISECONDS);           }catch(InterruptedExceptione){               e.printStackTrace();           }           executorService.shutdown();       }   } /**    *关闭当前ExecutorService,随后根据poolSize创建新的ExecutorService    \n*/   publicvoidcreateExecutorService(){       destoryExecutorService(1000);       executorService=Executors.newFixedThreadPool(poolSize);   }   /**    *调整线程池大小    *@see#createExecutorService()    */   publicvoidsetPoolSize(intpoolSize){       this.poolSize=poolSize;       createExecutorService();   }}  这里要额外说明一下invokeAll()和invokeAny()方法。前者会执行给定的所有Callable对象,等所有任务完成后返回一个包含了执行结果的List>,每个Future.isDone()都是true,可以用Future.get()拿到结果;后者只要完成了列表中的任意一个任务就立刻返回,返回值就是执行结果。还有一个比较诡异的地方 本代码是在JDK1.6下编译测试的,如果在JDK1.5下测试,很可能在invokeAll和invokeAny的地方出错。明明ValidationTask实现了Callable,可是它死活不认,类型不匹配,这时可以将参数声明由List改为List>。造成这个问题的主要原因是两个版本中invokeAll和invokeAny的方法签名不同,1.6里是invokeAll(Collection>tasks),而1.5里是invokeAll(Collection>tasks)。网上也有人遇到类似的问题(invokeAll()isnotwillingtoaceptaCollection>)。和其他资源一样,线程池在使用完毕后也需要释放,用shutdown()方法可以关闭线程池,如果当时池里还有没有被执行的任务,它会等待任务执行完毕,在等待期间试图进入线程池的任务将被拒绝。也可以用shutdownNow()来关闭线程池,它会立刻关闭线程池,没有执行的任务作为返回值返回。2、Lock多线程编程中常常要锁定某个对象,之前会用synchronized来实现,现在又多了另一种选择,那就是java.util.concurrent.locks。通过Lock能够实现更灵活的锁定机制,它还提供了很多synchronized所没有的功能,例如尝试获得锁(tryLock())。使用Lock时需要自己获得锁并在使用后手动释放,这一点与synchronized有所不同,所以通常Lock的使用方式是这样的:\nJava代码  Lockl=...;    l.lock();   try {      //执行操作    }finally {      l.unlock();   }   Lockl=...; l.lock();try{ //执行操作}finally{ l.unlock();}java.util.concurrent.locks中提供了几个Lock接口的实现类,比较常用的应该是ReentrantLock。以下范例中使用了ReentrantLock进行节点锁定:Java代码  package service;      import java.util.concurrent.locks.Lock;   import java.util.concurrent.locks.ReentrantLock;      /**    *节点类    *     *@authorDigitalSonic    */    public  class Node{      private Stringname;      private Stringwsdl;      private Stringresult= "PASS";      private String[]dependencies= new String[]{};      private Locklock= new ReentrantLock();      /**       *默认构造方法       */    \n   public Node(){      }             /**       *构造节点对象,设置名称及WSDL       */       public Node(Stringname,Stringwsdl){          this.name=name;          this.wsdl=wsdl;      }         /**       *返回包含节点名称、WSDL以及验证结果的字符串       */       @Override       public StringtoString(){          StringtoString="Node:" +name+ "WSDL:" +wsdl+ "Result:" +result;          return toString;      }             //Getter&Setter       public StringgetName(){          return name;      }         public  void setName(Stringname){          this.name=name;      }         public StringgetWsdl(){          return wsdl;      }         public  void setWsdl(Stringwsdl){          this.wsdl=wsdl;      }   \n      public StringgetResult(){          return result;      }         public  void setResult(Stringresult){          this.result=result;      }         public String[]getDependencies(){          return dependencies;      }         public  void setDependencies(String[]dependencies){          this.dependencies=dependencies;      }         public LockgetLock(){          return lock;      }      }   packageservice;importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;/** *节点类 *  *@authorDigitalSonic */publicclassNode{ privateStringname; privateStringwsdl; privateStringresult="PASS"; privateString[]dependencies=newString[]{}; privateLocklock=newReentrantLock(); /** *默认构造方法 \n*/ publicNode(){ }  /** *构造节点对象,设置名称及WSDL */ publicNode(Stringname,Stringwsdl){  this.name=name;  this.wsdl=wsdl; } /** *返回包含节点名称、WSDL以及验证结果的字符串 */ @Override publicStringtoString(){  StringtoString="Node:"+name+"WSDL:"+wsdl+"Result:"+result;  returntoString; }  //Getter&Setter publicStringgetName(){  returnname; } publicvoidsetName(Stringname){  this.name=name; } publicStringgetWsdl(){  returnwsdl; } publicvoidsetWsdl(Stringwsdl){  this.wsdl=wsdl; } publicStringgetResult(){  returnresult; } publicvoidsetResult(Stringresult){  this.result=result; } publicString[]getDependencies(){  returndependencies; }\n publicvoidsetDependencies(String[]dependencies){  this.dependencies=dependencies; } publicLockgetLock(){  returnlock; }}Java代码  package service;      import java.util.concurrent.Callable;   import java.util.concurrent.locks.Lock;   import java.util.logging.Logger;      import service.mock.MockNodeValidator;      /**    *执行验证的任务类    *     *@authorDigitalSonic    */    public  class ValidationTask implements Callable{      private  static Loggerlogger=Logger.getLogger("ValidationTask");         private String       wsdl;         /**       *构造方法,传入节点的WSDL       */       public ValidationTask(Stringwsdl){          this.wsdl=wsdl;      }         /**       *执行针对某个节点的验证
       *如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证       */       @Override       public Nodecall() throws Exception{          Nodenode=ValidationService.NODE_MAP.get(wsdl);          Locklock=null;          logger.info("开始验证节点:" +\nwsdl);          if (node!= null){              lock=node.getLock();              if (lock.tryLock()){                  //当前没有其他线程验证该节点                   logger.info("当前没有其他线程验证节点" +node.getName()+ "[" +wsdl+ "]");                  try {                      Noderesult=MockNodeValidator.validateNode(wsdl);                      mergeNode(result,node);                  }finally {                      lock.unlock();                  }              }else {                  //当前有别的线程正在验证该节点,等待结果                   logger.info("当前有别的线程正在验证节点" +node.getName()+ "[" +wsdl+ "],等待结果");                  lock.lock();                  lock.unlock();              }          }else {              //从未进行过验证,这种情况应该只出现在系统启动初期               //这时是在做初始化,不应该有冲突发生               logger.info("首次验证节点:" +wsdl);              node=MockNodeValidator.validateNode(wsdl);              ValidationService.NODE_MAP.put(wsdl,node);          }          logger.info("节点" +node.getName()+ "[" +wsdl+ "]验证结束,验证结果:" +node.getResult());          return node;      }         /**       *将src的内容合并进dest节点中,不进行深度拷贝       */       private NodemergeNode(Nodesrc,Nodedest){          dest.setName(src.getName());          dest.setWsdl(src.getWsdl());          dest.setDependencies(src.getDependencies());          \ndest.setResult(src.getResult());          return dest;      }   }   packageservice;importjava.util.concurrent.Callable;importjava.util.concurrent.locks.Lock;importjava.util.logging.Logger;importservice.mock.MockNodeValidator;/** *执行验证的任务类 *  *@authorDigitalSonic */publicclassValidationTaskimplementsCallable{   privatestaticLoggerlogger=Logger.getLogger("ValidationTask");   privateString       wsdl;   /**    *构造方法,传入节点的WSDL    */   publicValidationTask(Stringwsdl){       this.wsdl=wsdl;   }   /**    *执行针对某个节点的验证
    *如果正有别的线程在执行同一节点的验证则等待其结果,不重复执行验证    */   @Override   publicNodecall()throwsException{       Nodenode=ValidationService.NODE_MAP.get(wsdl);       Locklock=null;       logger.info("开始验证节点:"+wsdl);       if(node!=null){           lock=node.getLock();           if(lock.tryLock()){               //当前没有其他线程验证该节点               logger.info("当前没有其他线程验证节点"+node.getName()+"["+wsdl+"]");               try{                   Noderesult=MockNodeValidator.validateNode(wsdl);                   mergeNode(result,node);               }finally\n{                   lock.unlock();               }           }else{               //当前有别的线程正在验证该节点,等待结果               logger.info("当前有别的线程正在验证节点"+node.getName()+"["+wsdl+"],等待结果");               lock.lock();               lock.unlock();           }       }else{           //从未进行过验证,这种情况应该只出现在系统启动初期           //这时是在做初始化,不应该有冲突发生           logger.info("首次验证节点:"+wsdl);           node=MockNodeValidator.validateNode(wsdl);           ValidationService.NODE_MAP.put(wsdl,node);       }       logger.info("节点"+node.getName()+"["+wsdl+"]验证结束,验证结果:"+node.getResult());       returnnode;   }   /**    *将src的内容合并进dest节点中,不进行深度拷贝    */   privateNodemergeNode(Nodesrc,Nodedest){       dest.setName(src.getName());       dest.setWsdl(src.getWsdl());       dest.setDependencies(src.getDependencies());       dest.setResult(src.getResult());       returndest;   }}    请注意ValidationTask的call()方法,这里会先检查节点是否被锁定,如果被锁定则表示当前有另一个线程正在验证该节点,那就不用重复进行验证。第50行和第51行,那到锁后立即释放,这里只是为了等待验证结束。讲到Lock,就不能不讲Conditon,前者代替了synchronized,而后者则代替了Object对象上的wait()、notify()和notifyAll()方法(Condition中提供了await()、signal()和signalAll()方法),当满足运行条件前挂起线程。Condition是与Lock结合使用的,通过Lock.newCondition()方法能够创建与Lock绑定的Condition实例。JDK的JavaDoc中有一个例子能够很好地说明Condition的用途及用法:\nJava代码  class BoundedBuffer{    final Locklock= new ReentrantLock();    final ConditionnotFull =lock.newCondition();     final ConditionnotEmpty=lock.newCondition();        final Object[]items= new Object[100];    int putptr,takeptr,count;       public  void put(Objectx) throws InterruptedException{      lock.lock();      try {        while (count==items.length)           notFull.await();        items[putptr]=x;         if (++putptr==items.length)putptr= 0;        ++count;        notEmpty.signal();      }finally {        lock.unlock();      }    }       public Objecttake() throws InterruptedException{      lock.lock();      try {        while (count== 0)           notEmpty.await();        Objectx=items[takeptr];         if (++takeptr==items.length)takeptr= 0;        --count;        notFull.signal();        return x;      }finally {        lock.unlock();      }    }    }    classBoundedBuffer{  finalLocklock=newReentrantLock();  finalConditionnotFull =lock.newCondition();   finalConditionnotEmpty=lock.newCondition();\n  finalObject[]items=newObject[100];  intputptr,takeptr,count;  publicvoidput(Objectx)throwsInterruptedException{    lock.lock();    try{      while(count==items.length)         notFull.await();      items[putptr]=x;       if(++putptr==items.length)putptr=0;      ++count;      notEmpty.signal();    }finally{      lock.unlock();    }  }  publicObjecttake()throwsInterruptedException{    lock.lock();    try{      while(count==0)         notEmpty.await();      Objectx=items[takeptr];       if(++takeptr==items.length)takeptr=0;      --count;      notFull.signal();      returnx;    }finally{      lock.unlock();    }  }  }说到这里,让我解释一下之前的例子里为什么没有选择Condition来等待验证结束。await()方法在调用时当前线程先要获得对应的锁,既然我都拿到锁了,那也就是说验证已经结束了。。。3、并发集合类集合类是大家编程时经常要使用的东西,ArrayList、HashMap什么的,java.util包中的集合类有的是线程安全的,有的则不是,在编写多线程的程序时使用线程安全的类能省去很多麻烦,但这些类的性能如何呢?java.util.concurrent包中提供了几个并发结合类,例如ConcurrentHashMap、ConcurrentLinkedQueue和CopyOnWriteArrayList等等,根据不同的使用场景,开发者可以用它们替换java.util包中的相应集合类。\nCopyOnWriteArrayList是ArrayList的一个变体,比较适合用在读取比较频繁、修改较少的情况下,因为每次修改都要复制整个底层数组。ConcurrentHashMap中为Map接口增加了一些方法(例如putIfAbsenct()),同时做了些优化,总之灰常之好用,下面的代码中使用ConcurrentHashMap来作为全局节点表,完全无需考虑并发问题。ValidationService中只是声明(第17行),具体的使用是在上面的ValidationTask中。Java代码  package service;      import java.util.ArrayList;   import java.util.List;   import java.util.Map;   import java.util.concurrent.ConcurrentHashMap;      /**    *执行验证的服务类    *     *@authorDigitalSonic    */    public  class ValidationService{      /**       *全局节点表       */       public  static  final MapNODE_MAP= new ConcurrentHashMap();         private ThreadPoolServicethreadPoolService;             public ValidationService(ThreadPoolServicethreadPoolService){          this.threadPoolService=threadPoolService;      }         /**       *给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点       *        *@paramwsdl入口节点WSDL       */       public  void validate(Listwsdl){          ListvisitedNodes=new ArrayList();          ListnextRoundNodes=new ArrayList();      \n       nextRoundNodes.addAll(wsdl);          while (nextRoundNodes.size()> 0){              Listtasks=getTasks(nextRoundNodes);              Listnodes=threadPoolService.invokeAll(tasks);                 visitedNodes.addAll(nextRoundNodes);              nextRoundNodes.clear();              getNextRoundNodes(nodes,visitedNodes,nextRoundNodes);          }      }         private ListgetNextRoundNodes(Listnodes,              ListvisitedNodes,ListnextRoundNodes){          for (Nodenode:nodes){              for (Stringwsdl:node.getDependencies()){                  if (!visitedNodes.contains(wsdl)){                      nextRoundNodes.add(wsdl);                  }              }          }          return nextRoundNodes;      }         private ListgetTasks(Listnodes){          Listtasks=new ArrayList(nodes.size());          for (Stringwsdl:nodes){              tasks.add(new ValidationTask(wsdl));          }          return tasks;      }   }   packageservice;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;\n/** *执行验证的服务类 *  *@authorDigitalSonic */publicclassValidationService{ /** *全局节点表 */ publicstaticfinalMapNODE_MAP=newConcurrentHashMap(); privateThreadPoolServicethreadPoolService;  publicValidationService(ThreadPoolServicethreadPoolService){  this.threadPoolService=threadPoolService; } /** *给出一个入口节点的WSDL,通过广度遍历的方式验证与其相关的各个节点 *  *@paramwsdl入口节点WSDL */ publicvoidvalidate(Listwsdl){  ListvisitedNodes=newArrayList();  ListnextRoundNodes=newArrayList();  nextRoundNodes.addAll(wsdl);  while(nextRoundNodes.size()>0){   Listtasks=getTasks(nextRoundNodes);   Listnodes=threadPoolService.invokeAll(tasks);   visitedNodes.addAll(nextRoundNodes);   nextRoundNodes.clear();   getNextRoundNodes(nodes,visitedNodes,nextRoundNodes);  } } privateListgetNextRoundNodes(Listnodes,   ListvisitedNodes,ListnextRoundNodes){  for(Nodenode:nodes){   for(Stringwsdl:node.getDependencies()){    if(!visitedNodes.contains(wsdl)){     nextRoundNodes.add(wsdl);    }   }  }  returnnextRoundNodes; }\n privateListgetTasks(Listnodes){  Listtasks=newArrayList(nodes.size());  for(Stringwsdl:nodes){   tasks.add(newValidationTask(wsdl));  }  returntasks; }}4、AtomicInteger对变量的读写操作都是原子操作(除了long或者double的变量),但像数值类型的++--操作不是原子操作,像i++中包含了获得i的原始值、加1、写回i、返回原始值,在进行类似i++这样的操作时如果不进行同步问题就大了。好在java.util.concurrent.atomic为我们提供了很多工具类,可以以原子方式更新变量。以AtomicInteger为例,提供了代替++--的getAndIncrement()、incrementAndGet()、getAndDecrement()和decrementAndGet()方法,还有加减给定值的方法、当前值等于预期值时更新的compareAndSet()方法。下面的例子中用AtomicInteger保存全局验证次数(第69行做了自增的操作),因为validateNode()方法会同时被多个线程调用,所以直接用int不同步是不行的,但用AtomicInteger在这种场合下就很合适。Java代码  package service.mock;      import java.util.ArrayList;   import java.util.HashMap;   import java.util.List;   import java.util.Map;   import java.util.concurrent.atomic.AtomicInteger;   import java.util.logging.Logger;      import service.Node;      /**    *模拟执行节点验证的Mock类    *     *@authorDigitalSonic    */    public  class MockNodeValidator{      public  static  final List        ENTRIES = new ArrayList();      private  static  final MapNODE_MAP= new HashMap();         private  static AtomicInteger          count   = new AtomicInteger(0);      private  static Logger                 logger  =Logger.getLogger("MockNodeValidator");         /*       *构造模拟数据       */       static {          Nodenode0=new Node("NODE0", "http://node0/check?wsdl");//入口0           Nodenode1=new Node("NODE1", "http://node1/check?wsdl");          Nodenode2=new Node("NODE2", "http://node2/check?wsdl");          Nodenode3=new Node("NODE3", "http://node3/check?wsdl");          Nodenode4=new Node("NODE4", "http://node4/check?wsdl");          Nodenode5=new Node("NODE5", "http://node5/check?wsdl");          Nodenode6=new Node("NODE6", "http://node6/check?wsdl");//入口1           Nodenode7=new Node("NODE7", "http://node7/check?wsdl");          Nodenode8=new Node("NODE8", "http://node8/check?wsdl");          Nodenode9=new Node("NODE9", "http://node9/check?wsdl");             node0.setDependencies(new String[]{node1.getWsdl(),node2.getWsdl()});          node1.setDependencies(new String[]{node3.getWsdl(),node4.getWsdl()});          node2.setDependencies(new String[]{node5.getWsdl()});          node6.setDependencies(new String[]{node7.getWsdl(),node8.getWsdl()});          node7.setDependencies(new String[]{node5.getWsdl(),node9.getWsdl()});          node8.setDependencies(new String[]{node3.getWsdl(),node4.getWsdl()});             node2.setResult("FAILED"\n);             NODE_MAP.put(node0.getWsdl(),node0);          NODE_MAP.put(node1.getWsdl(),node1);          NODE_MAP.put(node2.getWsdl(),node2);          NODE_MAP.put(node3.getWsdl(),node3);          NODE_MAP.put(node4.getWsdl(),node4);          NODE_MAP.put(node5.getWsdl(),node5);          NODE_MAP.put(node6.getWsdl(),node6);          NODE_MAP.put(node7.getWsdl(),node7);          NODE_MAP.put(node8.getWsdl(),node8);          NODE_MAP.put(node9.getWsdl(),node9);             ENTRIES.add(node0);          ENTRIES.add(node6);      }         /**       *模拟执行远程验证返回节点,每次调用等待500ms       */       public  static NodevalidateNode(Stringwsdl){          Nodenode=cloneNode(NODE_MAP.get(wsdl));          logger.info("验证节点" +node.getName()+ "[" +node.getWsdl()+ "]");          count.getAndIncrement();          try {              Thread.sleep(500);          }catch (InterruptedExceptione){              e.printStackTrace();          }          return node;      }         /**       *获得计数器的值       */       public  static  int getCount(){          return count.intValue();      }         /**       *克隆一个新的Node对象(未执行深度克隆)       */       public  static NodecloneNode(NodeoriginalNode){          NodenewNode=new \nNode();             newNode.setName(originalNode.getName());          newNode.setWsdl(originalNode.getWsdl());          newNode.setResult(originalNode.getResult());          newNode.setDependencies(originalNode.getDependencies());             return newNode;      }   }   packageservice.mock;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.concurrent.atomic.AtomicInteger;importjava.util.logging.Logger;importservice.Node;/** *模拟执行节点验证的Mock类 *  *@authorDigitalSonic */publicclassMockNodeValidator{   publicstaticfinalList        ENTRIES =newArrayList();   privatestaticfinalMapNODE_MAP=newHashMap();   privatestaticAtomicInteger          count   =newAtomicInteger(0);   privatestaticLogger                 logger  =Logger.getLogger("MockNodeValidator");   /*    *构造模拟数据    */   static{       Nodenode0=newNode("NODE0","http://node0/check?wsdl");//入口0       Nodenode1=newNode("NODE1","http://node1/check?wsdl");       Nodenode2=newNode("NODE2","http://node2/check?wsdl");       Nodenode3=newNode("NODE3","http://node3/check?wsdl");       Nodenode4=newNode("NODE4","http://node4/check?wsdl");       Nodenode5=newNode("NODE5",\n"http://node5/check?wsdl");       Nodenode6=newNode("NODE6","http://node6/check?wsdl");//入口1       Nodenode7=newNode("NODE7","http://node7/check?wsdl");       Nodenode8=newNode("NODE8","http://node8/check?wsdl");       Nodenode9=newNode("NODE9","http://node9/check?wsdl");       node0.setDependencies(newString[]{node1.getWsdl(),node2.getWsdl()});       node1.setDependencies(newString[]{node3.getWsdl(),node4.getWsdl()});       node2.setDependencies(newString[]{node5.getWsdl()});       node6.setDependencies(newString[]{node7.getWsdl(),node8.getWsdl()});       node7.setDependencies(newString[]{node5.getWsdl(),node9.getWsdl()});       node8.setDependencies(newString[]{node3.getWsdl(),node4.getWsdl()});       node2.setResult("FAILED");       NODE_MAP.put(node0.getWsdl(),node0);       NODE_MAP.put(node1.getWsdl(),node1);       NODE_MAP.put(node2.getWsdl(),node2);       NODE_MAP.put(node3.getWsdl(),node3);       NODE_MAP.put(node4.getWsdl(),node4);       NODE_MAP.put(node5.getWsdl(),node5);       NODE_MAP.put(node6.getWsdl(),node6);       NODE_MAP.put(node7.getWsdl(),node7);       NODE_MAP.put(node8.getWsdl(),node8);       NODE_MAP.put(node9.getWsdl(),node9);       ENTRIES.add(node0);       ENTRIES.add(node6);   }   /**    *模拟执行远程验证返回节点,每次调用等待500ms    */   publicstaticNodevalidateNode(Stringwsdl){       Nodenode=cloneNode(NODE_MAP.get(wsdl));       logger.info("验证节点"+node.getName()+"["+node.getWsdl()+"]");       count.getAndIncrement();       try{           Thread.sleep(500);       }catch(InterruptedExceptione){           e.printStackTrace();       \n}       returnnode;   } /**    *获得计数器的值    */   publicstaticintgetCount(){       returncount.intValue();   }   /**    *克隆一个新的Node对象(未执行深度克隆)    */   publicstaticNodecloneNode(NodeoriginalNode){       NodenewNode=newNode();       newNode.setName(originalNode.getName());       newNode.setWsdl(originalNode.getWsdl());       newNode.setResult(originalNode.getResult());       newNode.setDependencies(originalNode.getDependencies());       returnnewNode;   }}上述代码还有另一个功能,就是构造测试用的节点数据,一共10个节点,有2个入口点,通过这两个点能够遍历整个系统。每次调用会模拟远程访问,等待500ms。环境间节点依赖如下:环境依赖 Node0[Node1,Node2]Node1[Node3,Node4]Node2[Node5]Node6[Node7,Node8]Node7[Node5,Node9]Node8[Node3,Node4]5、CountDownLatchCountDownLatch是一个一次性的同步辅助工具,允许一个或多个线程一直等待,直到计数器值变为0。它有一个构造方法,设定计数器初始值,即在await()结束等待前需要调用多少次countDown()方法。CountDownLatch的计数器不能重置,所以说它是“一次性”的,如果需要重置计数器,可以使用CyclicBarrier。在运行环境检查的主类中,使用了CountDownLatch来等待所有验证结束,在各个并发验证的线程完成任务结束前都会调用countDown(),因为有3个并发的验证,所以将计数器设置为3。\n最后将所有这些类整合起来,运行环境检查的主类如下。它会创建线程池服务和验证服务,先做一次验证(相当于是对系统做次初始化),随后并发3个验证请求。系统运行完毕会显示实际执行的节点验证次数和执行时间。如果是顺序执行,验证次数应该是13*4=52,但实际的验证次数会少于这个数字(我这里最近一次执行了33次验证),因为如果同时有两个线程要验证同一节点时只会做一次验证。关于时间,如果是顺序执行,52次验证每次等待500ms,那么验证所耗费的时间应该是26000ms,使用了多线程后的实际耗时远小于该数字(最近一次执行耗时4031ms)。Java代码  package service.mock;      import java.util.ArrayList;   import java.util.List;   import java.util.concurrent.CountDownLatch;      import service.Node;   import service.ThreadPoolService;   import service.ValidationService;      /**    *模拟执行这个环境的验证    *     *@authorDigitalSonic    */    public  class ValidationStarter implements Runnable{      private List     entries;      private ValidationServicevalidationService;      private CountDownLatch   signal;         public ValidationStarter(Listentries,ValidationServicevalidationService,              CountDownLatchsignal){          this.entries=entries;          this.validationService=validationService;          this.signal=signal;      }         /**   \n    *线程池大小为10,初始化执行一次,随后并发三个验证       */       public  static  void main(String[]args){          ThreadPoolServicethreadPoolService=new ThreadPoolService(10);          ValidationServicevalidationService=new ValidationService(threadPoolService);          Listentries=new ArrayList();          CountDownLatchsignal=new CountDownLatch(3);          long start;          long stop;             for (Nodenode:MockNodeValidator.ENTRIES){              entries.add(node.getWsdl());          }             start=System.currentTimeMillis();             validationService.validate(entries);          threadPoolService.execute(new ValidationStarter(entries,validationService,signal));          threadPoolService.execute(new ValidationStarter(entries,validationService,signal));          threadPoolService.execute(new ValidationStarter(entries,validationService,signal));             try {              signal.await();          }catch (InterruptedExceptione){              e.printStackTrace();          }             stop=System.currentTimeMillis();          threadPoolService.destoryExecutorService(1000);          System.out.println("实际执行验证次数:" +MockNodeValidator.getCount());          System.out.println("实际执行时间:" +(stop-start)+ "ms");      }   \n      @Override       public  void run(){          validationService.validate(entries);          signal.countDown();      }      }   packageservice.mock;importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.CountDownLatch;importservice.Node;importservice.ThreadPoolService;importservice.ValidationService;/** *模拟执行这个环境的验证 *  *@authorDigitalSonic */publicclassValidationStarterimplementsRunnable{   privateList     entries;   privateValidationServicevalidationService;   privateCountDownLatch   signal;   publicValidationStarter(Listentries,ValidationServicevalidationService,           CountDownLatchsignal){       this.entries=entries;       this.validationService=validationService;       this.signal=signal;   }   /**    *线程池大小为10,初始化执行一次,随后并发三个验证    */   publicstaticvoidmain(String[]args){       ThreadPoolServicethreadPoolService=newThreadPoolService(10);       ValidationServicevalidationService=new\nValidationService(threadPoolService);       Listentries=newArrayList();       CountDownLatchsignal=newCountDownLatch(3);       longstart;       longstop;       for(Nodenode:MockNodeValidator.ENTRIES){           entries.add(node.getWsdl());       }       start=System.currentTimeMillis();       validationService.validate(entries);       threadPoolService.execute(newValidationStarter(entries,validationService,signal));       threadPoolService.execute(newValidationStarter(entries,validationService,signal));       threadPoolService.execute(newValidationStarter(entries,validationService,signal));       try{           signal.await();       }catch(InterruptedExceptione){           e.printStackTrace();       }       stop=System.currentTimeMillis();       threadPoolService.destoryExecutorService(1000);       System.out.println("实际执行验证次数:"+MockNodeValidator.getCount());       System.out.println("实际执行时间:"+(stop-start)+"ms");   }   @Override   publicvoidrun(){       validationService.validate(entries);       signal.countDown();   }}    =================================我是分割线==============================\n本文没有覆盖java.util.concurrent中的所有内容,只是挑选一些比较常用的东西,想要获得更多详细信息请阅读JavaDoc。自打有了“轮子”理论,重复造大轮子的情况的确少了,但还是有人会做些小轮子,例如编写多线程程序时用到的小工具(线程池、锁等等),如果可以,请让自己再“懒惰”一点吧~
查看更多

相关文章

您可能关注的文档