Java多线程消费者问题

有一个简单的场景:一个Queue,包含很多Task,现在的程序要去Queue中取Task来执行
取的时候是多线程去取的,但是一旦有个线程拿到Queue中的某个Task,其他线程就必须退出

拿到Task的线程根据Task里面的内容,将具体工作(因为一个Task中可能存在多个具体的工作)分发给最终的工作执行线程(ProductionLineA或者ProductionLineB)去执行,工作执行线程执行完一个具体的工作后就把信息更新到Task中去,拿到Task的线程会不断检测当前Task里面的具体工作是否都执行完毕,如果完毕,拿到Task的线程退出,Task也会从队列中销毁

通俗的来讲也就是多线程抢任务,抢到一个任务后,把具体的工作分给多个线程去执行,它还可以回来继续抢其他的任务

注意点
1、抢任务的时候可以有多个线程去抢,但是最终一个任务只能有一个线程拿到,也就是执行任务分发的那段代码只能由某个线程执行一次,否则到后面具体的工作可能被执行多次
所以对已经抢到的任务打标记要只能打一次,不能存在A线程判断的时候标记没有被打上,B线程判断也没有,A打标记,B也打标记,这个到后面肯定就乱了,用double check来实现?
2、工作执行线程需要把执行完的情况更新到Task中去,如果第1点中具体工作分正确了,这里应该能保证不会有多个线程执行同样的具体工作,所以把执行完的情况写回Task的时候不用同步保证也行

目前任务标记用的是Thread,这个是不是不合适?
检测是否执行完毕的代码是单线程的,见WatchDog类,目前还缺超时检测,多线程的检测有没有必要?
锁加的恰不恰当?

多线程的代码确实比较难编写和调试

目前我实现了一个代码,看似没有问题,通过测试,分析日志,发现都还正常

			Lock lock = new ReentrantLock();
			lock.tryLock();
			// critical area start
			if (null != task.getExclusive()
					&& task.getExclusive() != Thread.currentThread()) {
				// 如果有人占了,那么自动退出
				// 这里怎么实现比较好?
				continue;
			} else {
				if (null == task.getExclusive()) {
					task.setExclusive(Thread.currentThread());
				}
			}
			// critical area end
			lock.unlock();

关键代码中,目前这一段因该还有修改的空间

运行文件run.sh
日志分析LogAnalyzer.java,目前只能说是大概的分析有没有明显出错的地方,比如执行的数量对不对

日志:

queue has 60 wks
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
can only be executed 10 times...
shan$evol in task[2] is executing by ProductionLineB...
shan$love in task[8] is executing by ProductionLineA...
shan$love in task[3] is executing by ProductionLineA...
can only be executed 10 times...
shan$love in task[0] is executing by ProductionLineA...
shan$vole in task[3] is executing by ProductionLineB...
shan$elvo in task[8] is executing by ProductionLineB...
shan$ovel in task[8] is executing by ProductionLineB...
shan$vole in task[8] is executing by ProductionLineB...
shan$love in task[5] is executing by ProductionLineA...
shan$love in task[5] is executing by ProductionLineA...
shan$love in task[4] is executing by ProductionLineA...
shan$love in task[7] is executing by ProductionLineA...
shan$leov in task[8] is executing by ProductionLineB...
shan$elvo in task[4] is executing by ProductionLineB...
shan$ovel in task[4] is executing by ProductionLineB...
shan$vole in task[4] is executing by ProductionLineB...
shan$leov in task[4] is executing by ProductionLineB...
shan$ovel in task[5] is executing by ProductionLineB...
shan$evol in task[5] is executing by ProductionLineB...
shan$elvo in task[5] is executing by ProductionLineB...
shan$vole in task[1] is executing by ProductionLineB...
shan$ovel in task[1] is executing by ProductionLineB...
shan$leov in task[1] is executing by ProductionLineB...
shan$evol in task[1] is executing by ProductionLineB...
shan$vole in task[6] is executing by ProductionLineB...
shan$evol in task[3] is executing by ProductionLineB...
shan$elvo in task[7] is executing by ProductionLineB...
shan$ovel in task[7] is executing by ProductionLineB...
shan$leov in task[7] is executing by ProductionLineB...
shan$love in task[9] is executing by ProductionLineA...
shan$evol in task[7] is executing by ProductionLineB...
shan$ovel in task[6] is executing by ProductionLineB...
task0 is done...
shan$evol in task[6] is executing by ProductionLineB...
shan$vole in task[9] is executing by ProductionLineB...
shan$leov in task[6] is executing by ProductionLineB...
shan$evol in task[9] is executing by ProductionLineB...
shan$ovel in task[5] is executing by ProductionLineB...
shan$leov in task[4] is executing by ProductionLineB...
shan$leov in task[8] is executing by ProductionLineB...
shan$leov in task[1] is executing by ProductionLineB...
task2 is done...
shan$elvo in task[7] is executing by ProductionLineB...
shan$evol in task[5] is executing by ProductionLineB...
shan$ovel in task[7] is executing by ProductionLineB...
shan$elvo in task[5] is executing by ProductionLineB...
shan$evol in task[3] is executing by ProductionLineB...
shan$evol in task[6] is executing by ProductionLineB...
shan$vole in task[1] is executing by ProductionLineB...
shan$ovel in task[1] is executing by ProductionLineB...
shan$vole in task[9] is executing by ProductionLineB...
shan$evol in task[7] is executing by ProductionLineB...
shan$vole in task[6] is executing by ProductionLineB...
shan$evol in task[9] is executing by ProductionLineB...
shan$leov in task[8] is executing by ProductionLineB...
task4 is done...
shan$evol in task[5] is executing by ProductionLineB...
shan$vole in task[1] is executing by ProductionLineB...
shan$ovel in task[1] is executing by ProductionLineB...
task3 is done...
shan$evol in task[7] is executing by ProductionLineB...
task6 is done...
shan$leov in task[8] is executing by ProductionLineB...
task9 is done...
task5 is done...
shan$vole in task[1] is executing by ProductionLineB...
shan$evol in task[7] is executing by ProductionLineB...
task8 is done...
task1 is done...
task7 is done...

完整源码:
multiple-consumers-problem

Leave a Reply

Your email address will not be published. Required fields are marked *