1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 A simple, featureless actor framework. It allows you to write an async 7 application using the actor pattern. It is suitable for applications that need 8 async:ness but not the highest achievable performance. 9 10 It is modelled after [C++ Actor Framework]() which I have used and am very 11 happy with how my applications turned out. Credit to Dominik Charousset, author 12 of CAF. 13 14 Most of the code is copied from Phobos std.concurrency. 15 16 A thread executes one actor at a time. The actor ID of the current actor is 17 accessible via `thisAid()`. 18 */ 19 module my.actor; 20 21 import core.atomic : cas, atomicStore, MemoryOrder, atomicLoad, atomicOp; 22 import core.sync.condition : Condition; 23 import core.sync.mutex : Mutex; 24 import core.thread : Thread; 25 import core.time : dur, Duration; 26 import logger = std.experimental.logger; 27 import std.array : appender, empty, array; 28 import std.datetime : SysTime, Clock; 29 import std.exception : collectException; 30 import std.format : formattedWrite; 31 import std.range : isOutputRange; 32 import std.stdio; 33 import std.traits; 34 import std.typecons : Tuple; 35 import std.variant : Variant; 36 37 import my.actor.mbox : MessageBox, Message, DelayedMessageBox, DelayedMessage; 38 import my.actor.typed; 39 import my.gc.refc; 40 41 class ActorException : Exception { 42 this(string msg) @safe pure nothrow @nogc { 43 super(msg); 44 } 45 } 46 47 alias RcActorInfo = RefCounted!ActorInfo; 48 49 struct ActorSystem { 50 /// Statistics about the actor system that is only updated in debug build. 51 static struct Stat { 52 // Method calls. 53 long putCnt; 54 long removeCnt; 55 long spawnCnt; 56 } 57 58 private { 59 Mutex lock_; 60 ulong nextId_; 61 shared Stat stat_; 62 63 RcActorInfo[ulong] actors_; 64 RcActorInfo[] toBeRemoved_; 65 } 66 67 this(this) @disable; 68 69 /// Spawn a scoped actor that can be used by the local thread/context to send/receive messages. 70 Aid scopedActor() @trusted { 71 synchronized (lock_) { 72 return Aid(++nextId_); 73 } 74 } 75 76 /** Spawn a new actor with the behavior `Behavior`. 77 * 78 * Params: 79 * TActor = the messages that the spawned actor must implement behaviors for. 80 * Behavior = behavior for each message. 81 * 82 * Returns: 83 * An `Aid` representing the new actor. 84 */ 85 Aid spawn(TActor, Behavior...)() if (isTypedActor!TActor) { 86 debug atomicOp!"+="(stat_.spawnCnt, 1); 87 88 auto aid = () { 89 synchronized (lock_) { 90 return Aid(++nextId_); 91 } 92 }(); 93 94 _spawnDetached(this, aid, false, makeTypedActor!(TActor, Behavior)(aid)); 95 return aid; 96 } 97 98 /** Spawn a new stateful actor with the behavior `Behavior` and state `State`. 99 * 100 * Params: 101 * TActor = the messages that the spawned actor must implement behaviors for. 102 * Behavior = behavior for each message. 103 * 104 * Returns: 105 * An `Aid` representing the new actor. 106 */ 107 Aid spawn(TActor, StateT, Behavior...)(StateT state = StateT.init) 108 if (isTypedActor!TActor) { 109 debug atomicOp!"+="(stat_.spawnCnt, 1); 110 111 auto aid = () { 112 synchronized (lock_) { 113 return Aid(++nextId_); 114 } 115 }(); 116 117 _spawnDetached(this, aid, false, makeStatefulTypedActor!(TActor, 118 StateT, Behavior)(aid, state)); 119 return aid; 120 } 121 122 size_t length() @safe pure const @nogc { 123 synchronized (lock_) { 124 return actors_.length; 125 } 126 } 127 128 /** Statistics about the actor system. 129 * 130 * Only updated in debug build. 131 */ 132 Stat systemStat() @safe pure nothrow const @nogc { 133 return stat_; 134 } 135 136 private RcActorInfo actor(Aid aid) @safe { 137 synchronized (lock_) { 138 if (auto v = aid.id in actors_) { 139 return *v; 140 } 141 } 142 throw new ActorException(null); 143 } 144 145 // TODO: change to safe 146 private void put(RcActorInfo actor) @safe { 147 debug atomicOp!"+="(stat_.putCnt, 1); 148 149 const id = actor.ident.id; 150 synchronized (lock_) { 151 actors_[id] = actor; 152 } 153 } 154 155 private void remove(RcActorInfo actor) @safe { 156 debug atomicOp!"+="(stat_.removeCnt, 1); 157 158 const id = actor.ident.id; 159 synchronized (lock_) { 160 if (auto v = id in actors_) { 161 toBeRemoved_ ~= *v; 162 (*v).release; 163 actors_.remove(id); 164 } 165 } 166 } 167 168 /** Periodic cleanup of stopped actors. 169 * 170 */ 171 private void cleanup() @trusted { 172 synchronized (lock_) { 173 foreach (ref a; toBeRemoved_) { 174 a.cleanup; 175 a.release; 176 } 177 toBeRemoved_ = null; 178 } 179 } 180 } 181 182 RefCounted!ActorSystem makeActorSystem() { 183 return ActorSystem(new Mutex).refCounted; 184 } 185 186 @("shall spawn a typed actor, process messages and cleanup") 187 unittest { 188 alias MyActor = TypedActor!(Tuple!(immutable(int)*)); 189 190 int cnt; 191 static void incr(immutable(int)* cnt) { 192 (*(cast(int*) cnt)) = 1; 193 thisAid.stop; 194 } 195 196 auto sys = makeActorSystem; 197 { 198 auto a = sys.spawn!(MyActor, incr); 199 auto self = sys.scopedActor; 200 send(self, a, cast(immutable(int*))&cnt); 201 delayedSend(self, a, 50.dur!"msecs", cast(immutable(int*))&cnt); 202 delayedSend(self, a, Clock.currTime + 50.dur!"msecs", cast(immutable(int*))&cnt); 203 204 Thread.sleep(200.dur!"msecs"); 205 206 assert(sys.systemStat.spawnCnt == 1); 207 assert(sys.systemStat.putCnt == 1); 208 assert(sys.systemStat.removeCnt == 1); 209 assert(cnt == 1); 210 } 211 212 assert(sys.length == 0); 213 assert(sys.toBeRemoved_.length == 1); 214 auto x = sys.toBeRemoved_[0]; 215 assert(x.refCount > 0); 216 sys.cleanup; 217 assert(sys.toBeRemoved_.length == 0); 218 assert(x.refCount == 1); 219 } 220 221 @("shall spawn a stateful typed actor and cleanup") 222 unittest { 223 alias MyActor = TypedActor!(Tuple!(int)); 224 225 struct State { 226 int cnt; 227 } 228 229 static void incr(ref RefCounted!State state, int value) { 230 state.cnt += value; 231 thisAid.stop; 232 } 233 234 auto sys = makeActorSystem; 235 { 236 auto st = State(8).refCounted; 237 auto a = sys.spawn!(MyActor, typeof(st), incr)(st); 238 auto self = sys.scopedActor; 239 send(self, a, 2); 240 delayedSend(self, a, 50.dur!"msecs", 2); 241 delayedSend(self, a, Clock.currTime + 50.dur!"msecs", 2); 242 243 Thread.sleep(200.dur!"msecs"); 244 245 assert(sys.systemStat.spawnCnt == 1); 246 assert(sys.systemStat.putCnt == 1); 247 assert(sys.systemStat.removeCnt == 1); 248 assert(st.cnt == 10); 249 } 250 251 assert(sys.length == 0); 252 assert(sys.toBeRemoved_.length == 1); 253 auto x = sys.toBeRemoved_[0]; 254 assert(x.refCount > 0); 255 sys.cleanup; 256 assert(sys.toBeRemoved_.length == 0); 257 assert(x.refCount == 1); 258 } 259 260 @("shall process messages in a stateful typed actor as fast as possible") 261 unittest { 262 import std.datetime : Clock, SysTime; 263 264 alias MyActor = TypedActor!(Tuple!(int)); 265 266 enum maxCnt = 1000; 267 268 struct State { 269 int cnt; 270 SysTime stopAt; 271 } 272 273 static void incr(ref RefCounted!State state, int value) { 274 state.cnt += value; 275 if (state.cnt == maxCnt) { 276 state.stopAt = Clock.currTime; 277 thisAid.stop; 278 } 279 } 280 281 auto sys = makeActorSystem; 282 { 283 const startAt = Clock.currTime; 284 auto st = State(0).refCounted; 285 auto a = sys.spawn!(MyActor, typeof(st), incr)(st); 286 287 auto self = sys.scopedActor; 288 foreach (_; 0 .. maxCnt) { 289 send(self, a, 1); 290 } 291 292 while (a.isAlive) { 293 Thread.sleep(1.dur!"msecs"); 294 } 295 296 assert(st.cnt == maxCnt); 297 const diff = (st.stopAt - startAt); 298 debug writefln!"%s:%s %s msg in %s (%s msg/s)"(__FUNCTION__, __LINE__, st.cnt, 299 diff, cast(double) st.cnt / (cast(double) diff.total!"nsecs" / 1000000000.0)); 300 } 301 302 sys.cleanup; 303 } 304 305 /// Actor ID 306 struct Aid { 307 private static struct Value { 308 shared ulong id; 309 Mutex lock; 310 MessageBox normal; 311 MessageBox priority; 312 DelayedMessageBox delayed; 313 MessageBox system; 314 shared ActorState state; 315 316 this(this) @disable; 317 318 ~this() @trusted { 319 import std.algorithm : filter; 320 import std.range : only; 321 322 if (lock is null) 323 return; 324 325 synchronized (lock) { 326 if (normal !is null && !normal.isClosed) 327 normal.close; 328 if (priority !is null && !priority.isClosed) 329 priority.close; 330 if (delayed !is null && !delayed.isClosed) 331 delayed.close; 332 if (system !is null && !system.isClosed) 333 system.close; 334 } 335 } 336 } 337 338 private RefCounted!Value value_; 339 340 this(ulong id) { 341 this.value_ = Value(id, new Mutex, new MessageBox, new MessageBox, 342 new DelayedMessageBox, new MessageBox); 343 } 344 345 /// Copy constructor 346 this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc { 347 this.value_ = rhs.value_; 348 } 349 350 string toString() @safe pure const { 351 auto buf = appender!string; 352 toString(buf); 353 return buf.data; 354 } 355 356 void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) { 357 if (value_.refCount == 0) { 358 formattedWrite(w, "%s(<uninitialized>)", typeof(this).stringof); 359 } else { 360 formattedWrite(w, "%s(refCount:%s, id:%s, state:%s)", typeof(this) 361 .stringof, value_.refCount, id, state); 362 } 363 } 364 365 bool isAlive() @safe pure const @nogc { 366 import std.algorithm : among; 367 368 if (value_.refCount == 0) 369 return false; 370 371 synchronized (value_.lock) { 372 return value_.state.among(ActorState.waiting, ActorState.running) != 0; 373 } 374 } 375 376 /// Mark an actor as running if it where previously waiting. 377 void running() @safe pure @nogc { 378 synchronized (value_.lock) { 379 if (value_.state == ActorState.waiting) { 380 value_.state = ActorState.running; 381 } 382 } 383 } 384 385 /** Stop an actor. 386 * 387 * The actor will be marked as stopping which will mean that it will at 388 * most process one more message and then be terminated. 389 */ 390 void stop() @safe pure @nogc { 391 synchronized (value_.lock) { 392 final switch (value_.state) with (ActorState) { 393 case waiting: 394 goto case; 395 case running: 396 value_.state = ActorState.stopping; 397 break; 398 case stopping: 399 goto case; 400 case terminated: 401 break; 402 } 403 } 404 } 405 406 /** Move the actor to the terminated state only if it where in the stopped state previously. 407 */ 408 package(my.actor) void terminated() @safe pure @nogc { 409 synchronized (value_.lock) { 410 if (value_.state == ActorState.stopping) { 411 value_.state = ActorState.terminated; 412 } 413 } 414 } 415 416 /// Release the references counted values of the actor id. 417 package(my.actor) void release() @trusted { 418 if (value_.refCount == 0) 419 return; 420 421 auto lock = value_.lock; 422 synchronized (lock) { 423 value_.release; 424 } 425 } 426 427 private ulong id() @safe pure nothrow @nogc const { 428 return atomicLoad(value_.get.id); 429 } 430 431 package(my.actor) MessageBox normalMbox() @safe pure @nogc { 432 return value_.normal; 433 } 434 435 package(my.actor) MessageBox priorityMbox() @safe pure @nogc { 436 return value_.priority; 437 } 438 439 package(my.actor) DelayedMessageBox delayedMbox() @safe pure @nogc { 440 return value_.delayed; 441 } 442 443 package(my.actor) MessageBox systemMbox() @safe pure @nogc { 444 return value_.system; 445 } 446 447 private ActorState state() @safe pure nothrow @nogc const { 448 return atomicLoad(value_.state); 449 } 450 451 private void setState(ActorState st) @safe pure nothrow @nogc { 452 atomicStore(value_.state, st); 453 } 454 } 455 456 enum ActorState : ubyte { 457 /// the actor has spawned and is waiting to start executing. 458 waiting, 459 /// the actor is running/active 460 running, 461 /// the actor is signaled to stop. It will process at most one more message. 462 stopping, 463 /// the actor has terminated and thus all its resources can be freed. 464 terminated, 465 } 466 467 /// Configure how an actor, when spawned, will be executed. 468 enum SpawnMode : ubyte { 469 /// the spawned actor is executed in the worker pool 470 pool, 471 /// executing in its own thread 472 detached 473 } 474 475 /** Send a message `msg` to actor `aid`. 476 */ 477 void send(T...)(Aid from, Aid to, T params) { 478 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 479 auto msg = Message(params); 480 if (!to.normalMbox.put(msg)) { 481 // TODO: add error handling when it fail. 482 } 483 } 484 485 /** Send a delayed message `msg` to actor `aid`. 486 * 487 * Params: 488 * delay = how much to delay the message with. 489 */ 490 void delayedSend(T...)(Aid from, Aid to, Duration delay, T params) { 491 import std.datetime : Clock; 492 493 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 494 auto msg = DelayedMessage(Clock.currTime + delay, Message(params)); 495 if (!to.delayedMbox.put(msg)) { 496 // TODO: add error handling when it fail. 497 } 498 } 499 500 /** Delay the message being processed until `delay`. 501 * 502 * Params: 503 * delay = how much to delay the message with. 504 */ 505 void delayedSend(T...)(Aid from, Aid to, SysTime delay, T params) { 506 import std.datetime : Clock; 507 508 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 509 auto msg = DelayedMessage(delay, Message(params)); 510 if (!to.delayedMbox.put(msg)) { 511 // TODO: add error handling when it fail. 512 } 513 } 514 515 /// Currently active actor.. 516 Aid thisAid() @safe { 517 auto info = thisInfo; 518 if (info.refCount != 0) { 519 return info.ident; 520 } 521 return Aid.init; 522 } 523 524 ref ActorSystem thisActorSystem() @safe { 525 auto info = thisInfo; 526 if (info.refCount != 0) { 527 return *info.system; 528 } 529 throw new ActorException(null); 530 } 531 532 /** Encapsulates all implementation-level data needed for scheduling. 533 * 534 * When defining a Scheduler, an instance of this struct must be associated 535 * with each logical thread. It contains all implementation-level information 536 * needed by the internal API. 537 */ 538 struct ActorInfo { 539 /// The system the actor belongs to. 540 ActorSystem* system; 541 Aid ident; 542 Aid owner; 543 bool[Aid] links; 544 545 /** Cleans up this ThreadInfo. 546 * 547 * This must be called when a scheduled thread terminates. It tears down 548 * the messaging system for the thread and notifies interested parties of 549 * the thread's termination. 550 */ 551 void cleanup() { 552 Aid discard; 553 foreach (aid; links.byKey) { 554 if (aid.isAlive) { 555 send(discard, aid, SystemMsgType.linkDead, ident); 556 } 557 } 558 if (owner.isAlive) { 559 send(discard, owner, SystemMsgType.linkDead, ident); 560 } 561 562 ident.release; 563 owner.release; 564 } 565 } 566 567 enum SystemMsgType { 568 linkDead, 569 } 570 571 private: 572 573 static ~this() { 574 thisInfo.release; 575 } 576 577 ref RefCounted!ActorInfo thisInfo() @safe nothrow @nogc { 578 static RefCounted!ActorInfo self; 579 return self; 580 } 581 582 /* 583 * 584 */ 585 void _spawnDetached(ref ActorSystem system, ref Aid newAid, bool linked, ActorRuntime actor) { 586 import std.stdio; 587 588 auto ownerAid = thisAid; 589 590 void exec() { 591 auto info = ActorInfo().refCounted; 592 593 info.ident = newAid; 594 info.ident.setState(ActorState.running); 595 info.owner = ownerAid; 596 system.put(info); 597 thisInfo() = info; 598 599 scope (exit) 600 () { info.ident.stop; system.remove(info); }(); 601 602 while (info.ident.state != ActorState.stopping) { 603 try { 604 actor.act(100.dur!"msecs"); 605 } catch (Exception e) { 606 debug logger.warning(e.msg); 607 } 608 } 609 } 610 611 auto t = new Thread(&exec); 612 t.start(); 613 614 auto info = thisInfo; 615 if (info.refCount != 0) { 616 thisInfo.links[newAid] = linked; 617 } 618 } 619 620 bool hasLocalAliasing(Types...)() { 621 import std.typecons : Rebindable; 622 623 // Works around "statement is not reachable" 624 bool doesIt = false; 625 static foreach (T; Types) { 626 static if (is(T == Aid)) { /* Allowed */ } else static if (is(T : Rebindable!R, R)) 627 doesIt |= hasLocalAliasing!R; 628 else static if (is(T == struct)) 629 doesIt |= hasLocalAliasing!(typeof(T.tupleof)); 630 else 631 doesIt |= std.traits.hasUnsharedAliasing!(T); 632 } 633 return doesIt; 634 } 635 636 template isSpawnable(F, T...) { 637 template isParamsImplicitlyConvertible(F1, F2, int i = 0) { 638 alias param1 = Parameters!F1; 639 alias param2 = Parameters!F2; 640 static if (param1.length != param2.length) 641 enum isParamsImplicitlyConvertible = false; 642 else static if (param1.length == i) 643 enum isParamsImplicitlyConvertible = true; 644 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 645 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, F2, i + 1); 646 else 647 enum isParamsImplicitlyConvertible = false; 648 } 649 650 enum isSpawnable = isCallable!F && is(ReturnType!F == void) && isParamsImplicitlyConvertible!(F, 651 void function(T)) && (isFunctionPointer!F || !hasUnsharedAliasing!F); 652 }