
akka设计模式
首先就是设计一个管理者,来管理控制线程的并发
taskTotal是并发执行的总任务数
taskExecute是已经执行过的任务数
根据传入的并发数量来控制我们一开始要创建多少个工作
通过.actorOf来创建一个worker的ActorRef,注意名字不能重复,并且将自己的ActorRef传过去,用来接收回馈的信息
每创建完一个ActorRef,我们从任务队列中获取一个任务,让它把任务消息发给workerActor去执行任务
worker执行完之后反馈执行情况,将任务执行数量和总任务数对比,不相等则进一步判断任务对列是否还有任务
有则将任务发送过去继续执行,没有则关闭对应的worker,等待其他线程执行完成
所以线程执行完成,则关闭整个system
class TaskMaster extends Actor{
val taskTotal=29
var taskExecute=0
var queue:mutable.Queue[CaseModel.TableModel]=_
override def receive: Receive = {
case CaseModel.Table(parallel)=>{
1 to parallel foreach { i =>
val worker = context.actorOf(Props(new TaskWorker(self)),s"Worker-$i")
worker ! CaseModel.Task(queue.dequeue())
}
}
case CaseModel.TaskResult(rf)=>{
taskExecute =taskExecute+1
if (taskExecute==taskTotal){
context.system.shutdown()
}else{
if (queue.nonEmpty){
rf ! CaseModel.Task(queue.dequeue())
}else{
context.stop(rf)
}
}
}
}
override def preStart(): Unit = {
queue = Demo.getTableModel
}
}
设计一个工作者WorkerActor来完成具体的每一项任务
从Master中将传一个任务过来,获取到任务信息,执行任务
执行完之后将任务执行情况返回给Master,并将自己的ActorRef传过去,接收反馈信息
class TaskWorker(mT:ActorRef) extends Actor{
def doWorker(tableModel: CaseModel.TableModel) = {
Thread.sleep(tableModel.time)
}
override def receive: Receive = {
case CaseModel.Task(tableModel)=>{
println("开始执行"+tableModel.tablename+"任务")
doWorker(tableModel)
println(tableModel.tablename+"任务执行完成")
mT ! CaseModel.TaskResult(self)
// context.stop(self)
}
}
}
消息传递样例
object CaseModel {
case class TableModel(tablename: String,time:Int,n:Int)
case class Table(i: Int)
case class Task(tableModel:TableModel)
case class TaskResult(rf:ActorRef)
}
一个简单的Demo
object Demo{
val queue0 = new mutable.Queue[CaseModel.TableModel]
def main(args: Array[String]): Unit = {
val t1 = CaseModel.TableModel("a", 4000, 0)
val t2 = CaseModel.TableModel("b", 4000, 1)
val t10 =CaseModel.TableModel("m", 4000, 9)
val t3 = CaseModel.TableModel("c", 4000, 2)
val t4 = CaseModel.TableModel("d", 4000, 3)
val t5 = CaseModel.TableModel("e", 4000, 4)
val t6 = CaseModel.TableModel("f", 4000, 5)
val t7 = CaseModel.TableModel("g", 4000, 6)
val t8 = CaseModel.TableModel("h", 3000, 7)
val t9 = CaseModel.TableModel("i", 2000, 8)
val t0 = CaseModel.TableModel("j", 1000, 9)
queue0.enqueue(t0,t2,t3,t4,t6,t7,t8,t9,t10,t0,t1,t2,t3,t4,t6,t7,t8,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0,t0)
//并行度
val number=10
val system = ActorSystem.create("task")
val mActor = system.actorOf(Props(classOf[TaskMaster]), "mActor")
mActor ! CaseModel.Table(number)
}
def getTableModel: mutable.Queue[CaseModel.TableModel] ={
queue0
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)