Actor并行化的wordcount怎么实现
Actor并行化的wordcount怎么实现
本篇内容介绍了“Actor并行化的wordcount怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
在scala中她能实现很强大的功能,他是基于并发机制的一个事件模型
我们现在学的scala2.10.x版本就是之前的Actor
同步:在主程序上排队执行的任务,只有前一个任务执行完毕后,才能执行下一个任务
异步:指不进入主程序,而进入"任务对列"的任务,只有等主程序任务执行完毕,"任务对列"开始请求主程序,请求任务执行,该任务会进入主程序
java
共享变量 -- 加锁
会出现锁死问题
scala
Actor不共享数据
没有锁的概念
Actor通信之间需要message(通信)
Aactor执行顺序
1.首先调用start()方法启动Actor
2.调用start()方法后act()方法会被执行
3.Actor之间进行发送消息
Actor发送消息的三种方式
! -> 发送异步消息,没有返回值
!? -> 发送同步消息,有返回值,会有线程等待
!! -> 发送异步消息,有返回值,返回值类型Future[Any](用来获取异步操作结果)
Actor并行执行
//注意,这两个actor会并行执行,当其中一个for循环结束后,actor结束
object ActorDemo01 {
def main(args: Array[String]): Unit = {
MyActor1.start()
MyActor2.start()
}
}
object MyActor1 extends Actor{
override def act(): Unit = {
for (i <- 1 to 10){
println(s"actor => $i")
Thread.sleep(2000)
}
}
object MyActor2 extends Actor{
override def act(): Unit = {
for (i <- 1 to 5){
println(s"actor2 => $i")
Thread.sleep(2000)
}
}
}
}
用Actor不断接受消息
执行第一种方式,异步
object ActorDemo02 {
def main(args: Array[String]): Unit = {
val actor: MyActor = new MyActor
actor.start()
//并行执行
actor ! "start" // !->异步
actor ! "stop"
println("发送完成")
}
}
class MyActor extends Actor{
override def act(): Unit = {
while (true){ //死循环
receive { //接收
case "start" => {
println("starting")
Thread.sleep(1000)
println("started")
}
case "stop" => {
println("stopping")
Thread.sleep(1000)
println("stopped")
}
}
}
}
}
第二种方式:利用react来代替receive,也就是说react线程可复用,比receive更高效
object ActorDemo03 {
def main(args: Array[String]): Unit = {
val actor: MyActor3 = new MyActor3
actor.start()
actor ! "start"
actor ! "stop"
println("成功了")
}
}
class MyActor3 extends Actor{
override def act(): Unit = {
loop {
react{
case "start" =>{
println("starting")
Thread.sleep(1000)
println("sarted")
}
case "stop" =>{
println("stoppting")
Thread.sleep(1000)
println("stopped")
}
}
}
}
}
结合样例类练习Actor发送消息
//创建样例类
case class AsyncMsg(id: Int, msg: String)
case class SyncMsg(id: Int, msg: String)
case class ReplyMsg(id: Int, msg: String)
object ActorDemo01 extends Actor {
override def act(): Unit = {
while (true) {
receive {
case "start" => println("starting...")
case AsyncMsg(id, msg) =>
{
println(s"id:$id,msg:$msg")
sender ! ReplyMsg(1,"sucess") //接收到消息后返回响应消息
}
case SyncMsg(id,msg) => {
println(s"id:$id,msg:$msg")
sender ! ReplyMsg(2,"sucess")
}
}
}
}
}
object ActorTest{
def main(args: Array[String]): Unit = {
val actor: Actor = ActorDemo01.start()
// //异步发送消息,没有返回值
// actor ! AsyncMsg(3,"heihei")
// println("异步消息发送完成,没有返回值")
// //同步发送消息,有返回值
// val text: Any = actor !? SyncMsg(4,"OK")
// println(text)
// println("同步消息发送成功")
//异步发送消息,有返回值,返回类型为Future[Any]
val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的")
Thread.sleep(2000)
if (reply.isSet){
val applyMsg: Any = reply.apply()
println(applyMsg)
}else{
println("Nothing")
}
}
}
Actor并行化的wordcount
class Task extends Actor {
override def act(): Unit = {
loop {
react {
case SubmitTask(fileName) => {
val contents = Source.fromFile(new File(fileName)).mkString
val arr = contents.split("\r\n")
val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
//val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
sender ! ResultTask(result)
}
case StopTask => {
exit()
}
}
}
}
}
object WorkCount {
def main(args: Array[String]) {
val files = Array("c://words.txt", "c://words.log")
val replaySet = new mutable.HashSet[Future[Any]]
val resultList = new mutable.ListBuffer[ResultTask]
for(f <- files) {
val t = new Task
val replay = t.start() !! SubmitTask(f)
replaySet += replay
}
while(replaySet.size > 0){
val toCumpute = replaySet.filter(_.isSet)
for(r <- toCumpute){
val result = r.apply()
resultList += result.asInstanceOf[ResultTask]
replaySet.remove(r)
}
Thread.sleep(100)
}
val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
println(finalResult)
}
}
case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])
“Actor并行化的wordcount怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!
推荐阅读
-
在Python中,将K添加到列元组列表中的最小元素
处理数据集涉及识别特定列中的最小值并通过添加常量值(K)来更新它。通过实施优化的解决方案,我们可以有效地执行此操作,这对于数据...
-
使用switch case语句编写的C程序,用于计算几何图形的面积
#includevoidmain(){intfig_code;floatside,base,length,...
-
如何使 C# 代码可重用?
要在C#中使代码可重用,请使用接口。接口定义属性、方法和事件,这些成员是接口的成员。接口只包含成员的声明。派生类负责定义成员。这通...
-
C# 中的覆盖和隐藏有什么区别?
方法隐藏在C#中也称为隐藏。父类的方法可供子类使用,无需在遮蔽中使用override关键字。子类有其自己版本的相同函数。在...
-
在Java中使用示例双倍longValue()函数
Java是一种强大的面向对象语言,可以对各种数据类型进行高度的控制和精确度。其中一种功能是doublelongValue(),...
-
如何在Java中定义JSON字段名称的命名约定?
TheFieldNamingPolicycanbeusedtodefineafewstandardnaming...
-
Servlet中的HttpSession接口
在JavaWeb开发领域,了解HttpSession接口是创建动态和响应式Web应用程序的关键。在本文中,我们将探讨...
-
使用while循环查找自然数之和的Java程序
自然数之和可以使用编程语言中的不同迭代语句来计算。迭代语句是执行一组特定代码行直到循环语句中的条件失败的语句。在本文中,我们将讨论...
-
我们可以将Java数组转换为列表吗?
我们可以使用Arrays.asList()方法轻松地将Java数组转换为List。语法publicstaticLi...
-
Java中如何在不使用任何外部库的情况下读取网页内容?
TheURLclassofthejava.netpackagerepresentsaUniformResour...