如何判断线程池处理完任务

cy729215495 2008-11-27 12:24:21
用的是ThreadPoolExecutor,做法是提交一个任务后不管,可以提交多个任务.最后要等任务全部完成了在去调用一个方法,我原来的做法是:


public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}


public void synBackup(String fn, IProgress pro) {

if (isEndTask()) //如果线程池处理完毕,就干别的事情...,这个方法一定会返回true的.
{
....

}
}

也就是调用synBackup(String fn, IProgress pro)这个方法的时候,一定使线程池里面的任务完成了我才可以干其他的事情.
本来不用什么线程池,也就是单线程来处理这个任务只需要17秒的,可是我用线程池后居然需要18秒,郁闷,没有体现速度.
不知道是我这样判断任务处理完毕的方法不对,还是构造线程池的时候参数设置的不是很合理,请高手指教指教!!

...全文
1052 15 打赏 收藏 转发到动态 举报
写回复
用AI写文章
15 条回复
切换为时间正序
请发表友善的回复…
发表回复
云上飞翔 2008-12-03
  • 打赏
  • 举报
回复
cy729215495 兄弟:
因私信只能<=200字节.代码发不全,只好帖在此处了.

答:方案是完全可行的啊.
问题发生在this.getActiveCount() == 0上.当最后一个线程执行完给定的任务后, 再调用afterExecute时,此时getActiveCount() 应为1.
另外afterExecute中最后部分要加上synchronized. 修改后的afterExecute()方法如下:

protected void afterExecute(Runnable r, Throwable t)
{ // TODO Auto-generated method stub
super.afterExecute(r, t);
synchronized(this){
System.out.println("自动调用了....afterEx count:"+this.getActiveCount());
if(this.getActiveCount() == 1) {
this.hasFinish=true; this.notify();
}//if
}// synchronized }

我的测试代码,运行正常.测试代码如下:(在你的原始代码上简化而来)

import java.io.*;
import java.util.concurrent.*;
import java.util.*;

class MyThreadPoolExecutor extends ThreadPoolExecutor{
private boolean hasFinish = false;

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
// TODO Auto-generated constructor stub

}

public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
// TODO Auto-generated constructor stub

}


public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
// TODO Auto-generated constructor stub

}



public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
// TODO Auto-generated constructor stub

}

/* (non-Javadoc)
* @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
// TODO Auto-generated method stub
super.afterExecute(r, t);
synchronized(this){
System.out.println("自动调用了....afterEx 此时getActiveCount()值:"+this.getActiveCount());
if(this.getActiveCount() == 1)//已执行完任务之后的最后一个线程
{
this.hasFinish=true;
this.notify();
}//if
}// synchronized
}

public void isEndTask() {
synchronized(this){
while (this.hasFinish==false) {
System.out.println("等待线程池所有任务结束: wait...");
try {
this.wait();
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

}

public class BackupDefault {

private MyThreadPoolExecutor executor;

public BackupDefault() {


this.executor = new MyThreadPoolExecutor(5, 20,0, TimeUnit.SECONDS, new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardOldestPolicy());

}


public void submit() {

executor.execute(new Runnable() {
public void run() {

System.out.println("Current Tread:"+Thread.currentThread().getName());
try {
Thread.sleep(1000);//休眠1秒,模拟该任务要1秒的时间才能完成.
} catch (InterruptedException e) {
// TODO Auto-generated catch block
//e.printStackTrace();
}
}

});

}




public void synBackup()
{

this.executor.isEndTask(); //等待线程池处理完毕,就创建xml配置文件

System.out.println("=======线程池处理完毕!===\n此时线程池中活动任务数是:(正确数值应是0--用于验证isEndTask()的正确性)"+this.executor.getActiveCount());


}
public static void main(String[] args) {

BackupDefault bd = new BackupDefault();
for(int i=1;i<=10;i++)
{
bd.submit();//线程池中创建10个线程
}
bd.synBackup(); //等待所有任务完成




}

}

程序运行结果:
等待线程池所有任务结束: wait...
Current Tread:pool-1-thread-1
Current Tread:pool-1-thread-2
Current Tread:pool-1-thread-3
Current Tread:pool-1-thread-4
Current Tread:pool-1-thread-5
自动调用了....afterEx 此时getActiveCount()值:5
Current Tread:pool-1-thread-1
自动调用了....afterEx 此时getActiveCount()值:5
Current Tread:pool-1-thread-2
自动调用了....afterEx 此时getActiveCount()值:5
Current Tread:pool-1-thread-3
自动调用了....afterEx 此时getActiveCount()值:5
Current Tread:pool-1-thread-4
自动调用了....afterEx 此时getActiveCount()值:5
Current Tread:pool-1-thread-5
自动调用了....afterEx 此时getActiveCount()值:5
自动调用了....afterEx 此时getActiveCount()值:4
自动调用了....afterEx 此时getActiveCount()值:3
自动调用了....afterEx 此时getActiveCount()值:2
自动调用了....afterEx 此时getActiveCount()值:1
=======线程池处理完毕!===
此时线程池中活动任务数是:(正确数值应是0--用于验证isEndTask()的正确性)0

leo_bogard 2008-12-02
  • 打赏
  • 举报
回复
up
cy729215495 2008-12-02
  • 打赏
  • 举报
回复
12楼的,你为什么要顶,不要人云亦云.说说你自己的看法.不要只知道顶别人的
landyshouguo 2008-12-01
  • 打赏
  • 举报
回复
[Quote=引用 3 楼 yanhan0615 的回复:]
我个人认为这个方法是有问题的:
Java code
public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}



建议楼主试着用在while里面增加sleep(),哪怕是10ms都比没有强,否则的话这个该方法独占cpu资源太多了...
[/Quote]顶
西瓜 2008-11-30
  • 打赏
  • 举报
回复
没玩过
landyshouguo 2008-11-29
  • 打赏
  • 举报
回复
[Quote=引用 7 楼 jiangnaisong 的回复:]
引用楼主 cy729215495 的帖子:
用的是ThreadPoolExecutor,做法是提交一个任务后不管,可以提交多个任务.最后要等任务全部完成了在去调用一个方法,我原来的做法是:


Java code
public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}


public void synBackup(String fn, IProgress pro) {

if (isEndTa…
[/Quote]ding
tiyuzhongxin789 2008-11-28
  • 打赏
  • 举报
回复
建议楼主试着用在while里面增加sleep(),哪怕是10ms都比没有强,否则的话这个该方法独占cpu资源太多了...
Andy__Huang 2008-11-28
  • 打赏
  • 举报
回复
看你这个例子都没有什么代码,我看过一个生产者和消费者之间的关系
里面至少wait()和notify()的方法
cy729215495 2008-11-27
  • 打赏
  • 举报
回复
可以:

public class BackupDefault extends BackupAbstract {

public static final int byteSize = 1024;

public static final String TEMP_DIR = "temp";

public static final String FILE_NAME = "fname";

public static final String FILE_DATE = "fdate";

public static final String FILE_SIZE = "fsize";

public static final String FILE_SEPARATOR = File.separator;

private ThreadPoolExecutor executor;

private List itemList;

public BackupDefault() {
this.executor = new ThreadPoolExecutor(5, 20,0, TimeUnit.SECONDS, new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
itemList = new ArrayList();
}

/**
* 把要备份的文件,放在临时目录下面
* @param item 文件备份项
* @throws IOException 如果发生io错误
*/
public synchronized void backupFile(BackupItem item) throws IOException {
InputStream fin = null;
OutputStream fout = null;
ZipOutputStream zipout = null;
ZipEntry entry = null;
String curDir = null;
try {
fin = item.getContent();
FileFunc.createDir(new File("").getAbsolutePath() + this.FILE_SEPARATOR + this.TEMP_DIR + this.FILE_SEPARATOR);//创建临时目录,名字就是temp

String name = createDir(item.getTargetFile());//得到绝对物理保存文件路径

fout = new FileOutputStream(name);//当前目录+相对目录
zipout = new ZipOutputStream(fout);
zipout.setEncoding("GBK"); // 解决压缩表内的文件名乱码问题
byte[] b = new byte[byteSize];
int size = 0;
entry = new ZipEntry(item.getPpt("fname"));
zipout.putNextEntry(entry);
while ((size = fin.read(b)) != -1) {
zipout.write(b, 0, size);

}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
zipout.close();
}
catch (IOException e) {
e.printStackTrace();
}
try {
fout.close();
}
catch (IOException e) {
e.printStackTrace();
}
try {
fin.close();
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

fin = null;
fout = null;
curDir = null;
zipout = null;
entry = null;
}

}

/**
* 在临时目录下面创建目录
* @param nonAbsPath 路径,这里指的是相对路径
* @return
* @throws Exception 如果指定的是目录名称而不是文件
*/
private String createDir(String nonAbsPath) throws Exception {
String dirPath = "";
File file = new File(new File("").getAbsoluteFile() + BackupDefault.FILE_SEPARATOR + BackupDefault.TEMP_DIR
+ BackupDefault.FILE_SEPARATOR + nonAbsPath);
boolean flag = FileFunc.createDirsOfFile(file);//创建文件的目录
return file.getAbsolutePath();

}

/**
* 提交一个备份项
* @param item 一个备份项
*
*
*/
public void submit(final BackupItem item) {
// itemList.add(item);
// try {
// backupFile(item);
// }
// catch (IOException e) {
// e.printStackTrace();
// }

executor.execute(new Runnable() {
public void run() {
try {
itemList.add(item);
backupFile(item);
}
catch (IOException e) {
e.printStackTrace();
}
}

});

}

/**
* 往压缩流里面写入数据
* @param filename 文件名称
* @param zipout 压缩流
* @param fn
* @throws IOException 如果发生io错误
*/

private void writedToZipStream(String filename, ZipOutputStream zipout, String fn) throws IOException {
FileInputStream fin = null;
ZipEntry entry = null;
File file = new File(filename);
File filestr[] = file.listFiles();
for (int i = 0; i < filestr.length; i++) {
if (filestr[i].isDirectory()) {
writedToZipStream(filestr[i].getAbsolutePath(), zipout, fn);
}
else {

try {
fin = new FileInputStream(filestr[i]);

byte[] b = new byte[byteSize];
int length = 0;
entry = new ZipEntry(filestr[i].getAbsolutePath().replace(
new File("").getAbsolutePath() + this.FILE_SEPARATOR + this.TEMP_DIR + this.FILE_SEPARATOR, ""));
zipout.putNextEntry(entry);
while ((length = fin.read(b)) != -1) {
zipout.write(b, 0, length);
}
}

finally {

fin.close();
fin = null;

file = null;
}

}
}
}

/**
* 进行备份操作,把临时存放在系统目录下面的文件拷贝到指定的目录下面
* @param fn 绝对路径名称,保存时候所输入的名字
*
* @param pro 进度条
* @exception 如果发生io错误或者指定目录不存在
*/
public void synBackup(String fn, IProgress pro) throws Exception{

if (isEndTask()) //如果线程池处理完毕,就创建xml配置文件
{
this.createXMLConfFile();

String finname = new File("").getAbsolutePath() + this.FILE_SEPARATOR + this.TEMP_DIR + this.FILE_SEPARATOR;
ZipOutputStream zipout = null;
try {
String tofn = fn + this.FILE_SEPARATOR + System.currentTimeMillis() + ".zip";
zipout = new ZipOutputStream(new FileOutputStream(tofn));
writedToZipStream(finname, zipout, fn);
// FileFunc.removeDir(new File(finname));//删除临时文件

}
catch (FileNotFoundException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
zipout.close();
}
catch (Exception e) {
e.printStackTrace();
}
zipout = null;
}
}

}

/**
* 判断任务是否处理完成,如果完成,返回为true
* @return
*/
public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}

/**
* 创建xml配置文件
*/
public void createXMLConfFile() {
StringBuffer sb = new StringBuffer("<?xml version='1.0' encoding='gbk' ?>\n");
StringMap smap = new StringMap("");
sb.append("<items>\n");
for (int i = 0; i < itemList.size(); i++) {
BackupItem item = (BackupItem) itemList.get(i);
sb.append("<item type=\"" + item.getType() + "\">\n");

sb.append("<description>");
sb.append(item.getDescription());
sb.append("</description>\n");

sb.append("<targetfile>");
sb.append(item.getTargetFile());
sb.append("</targetfile>\n");

sb.append("<ppts>");
for (int j = 0; j < item.getPpt().length; j++) {

sb.append(item.getPpt()[j] + ":");
sb.append(item.getPpt(item.getPpt()[j]));
if (j != item.getPpt().length - 1)
sb.append("#");
}
sb.append("</ppts>\n");

sb.append("</item>\n");

}
sb.append("</items>");

FileOutputStream fout = null;
try {
fout = new FileOutputStream(new File("").getAbsolutePath() + this.FILE_SEPARATOR + this.TEMP_DIR
+ this.FILE_SEPARATOR + "config.xml");
fout.write(sb.toString().getBytes());
}
catch (FileNotFoundException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
finally {
try {
fout.close();
}
catch (IOException e) {
e.printStackTrace();
}
fout = null;
sb = null;
}
}

}

bzwm 2008-11-27
  • 打赏
  • 举报
回复
能多贴点代码吗
云上飞翔 2008-11-27
  • 打赏
  • 举报
回复
[Quote=引用楼主 cy729215495 的帖子:]
用的是ThreadPoolExecutor,做法是提交一个任务后不管,可以提交多个任务.最后要等任务全部完成了在去调用一个方法,我原来的做法是:


Java code
public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}


public void synBackup(String fn, IProgress pro) {

if (isEndTask()) //如果线程池处理完毕,就干别的事…
[/Quote]

答:isEndTask()用while(true){..}来判别所有线程已执行结束,否则程序就不许往下执行.while(true){..}在多线程中是很浪费CPU的,从而使得线程池中各个线程得到很少的CPU机会去执行自己各自的任务。因此影响了线程池的优势的发挥。
那如何改进代码呢?
我的建议是:

1)从ThreadPoolExecutor继承,定制它的回调方法:
protected void afterExecute(Runnable r, Throwable t),在该方法的代码中,判getActiveCount() 是不是 0,若是0,则置boolean 型变量hasFinished=true;并发出notifyAll()通知,通知synBackup()方法所在的线程,hasFinished已为true,它可以开始运行了[主要原因是:synBackup()方法调用了下边的waitForEndTask() 方法,而该方法是用wait()等待线程池所有线程运行结束的。]。

2)isEndTask()方法的代码不能是while(true);改为:若没有完成,就wait(),放弃CPU,让CPU宝贵的资源留给线程池中的线程。因此方法名改为waitForEndTask()。代码如下:
public void waitForEndTask() {
synchronized(变量hasFinished所在的对象){
while (hasFinished==false) {
try{变量hasFinished所在的对象.wait();}
catch(InterruptedException e){}
}
}
3)这样设计的目的是:当线程池中线程没有全部运行结束时,synBackup()方法[内部调用了waitForEndTask() ]所有的线程是处于wait()下,不占用宝贵的CPU资源,让CPU资源全部留给了线程池中线程。当线程池中所有的线程全运行结束,才会通过notifyAll()来唤醒synBackup()方法所有的线程继续向下运行。

以上仅供你参考
myjava_024 2008-11-27
  • 打赏
  • 举报
回复
up
guoxyj 2008-11-27
  • 打赏
  • 举报
回复
up
mdog26 2008-11-27
  • 打赏
  • 举报
回复
楼上说的有道理
yanhan0615 2008-11-27
  • 打赏
  • 举报
回复
我个人认为这个方法是有问题的:

public boolean isEndTask() {
while (true) {
if (this.executor.getActiveCount() == 0) {
return true;

}

}
}

建议楼主试着用在while里面增加sleep(),哪怕是10ms都比没有强,否则的话这个该方法独占cpu资源太多了...

62,614

社区成员

发帖
与我相关
我的任务
社区描述
Java 2 Standard Edition
社区管理员
  • Java SE
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧