1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.actor; 7 8 import std.stdio : writeln, writefln; 9 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : schwartzSort, max, min, among; 13 import std.array : empty; 14 import std.datetime : SysTime, Clock, dur; 15 import std.exception : collectException; 16 import std.functional : toDelegate; 17 import std.meta : staticMap; 18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction; 19 import std.typecons : Tuple, tuple; 20 import std.variant : Variant; 21 22 import my.actor.common : ExitReason, SystemError, makeSignature; 23 import my.actor.mailbox; 24 import my.actor.msg; 25 import my.actor.system : System; 26 import my.actor.typed : isTypedAddress, isTypedActorImpl; 27 import my.gc.refc; 28 import sumtype; 29 30 private struct PromiseData { 31 WeakAddress replyTo; 32 ulong replyId; 33 34 /// Copy constructor 35 this(ref return scope typeof(this) rhs) @safe nothrow @nogc { 36 replyTo = rhs.replyTo; 37 replyId = rhs.replyId; 38 } 39 40 @disable this(this); 41 } 42 43 // deliver can only be called one time. 44 struct Promise(T) { 45 package { 46 RefCounted!PromiseData data; 47 } 48 49 void deliver(T reply) { 50 auto tmp = reply; 51 deliver(reply); 52 } 53 54 /** Deliver the message `reply`. 55 * 56 * A promise can only be delivered once. 57 */ 58 void deliver(ref T reply) @trusted 59 in (!data.empty, "promise must be initialized") { 60 if (data.empty) 61 return; 62 scope (exit) 63 data.release; 64 65 // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed. 66 if (auto replyTo = data.get.replyTo.lock.get) { 67 enum wrapInTuple = !is(T : Tuple!U, U); 68 static if (wrapInTuple) 69 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply)))); 70 else 71 replyTo.put(Reply(data.get.replyId, Variant(reply))); 72 } 73 } 74 75 void opAssign(Promise!T rhs) { 76 data = rhs.data; 77 } 78 79 /// True if the promise is not initialized. 80 bool empty() { 81 return data.empty || data.get.replyId == 0; 82 } 83 84 /// Clear the promise. 85 void clear() { 86 data.release; 87 } 88 } 89 90 auto makePromise(T)() { 91 return Promise!T(refCounted(PromiseData.init)); 92 } 93 94 struct RequestResult(T) { 95 this(T v) { 96 value = typeof(value)(v); 97 } 98 99 this(ErrorMsg v) { 100 value = typeof(value)(v); 101 } 102 103 this(Promise!T v) { 104 value = typeof(value)(v); 105 } 106 107 SumType!(T, ErrorMsg, Promise!T) value; 108 } 109 110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe; 111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg, 112 ulong replyId, WeakAddress replyTo) @safe; 113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe; 114 115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow; 116 117 /** Actors send error messages to others by returning an error (see Errors) 118 * from a message handler. Similar to exit messages, error messages usually 119 * cause the receiving actor to terminate, unless a custom handler was 120 * installed. The default handler is used as fallback if request is used 121 * without error handler. 122 */ 123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow; 124 125 /** Bidirectional monitoring with a strong lifetime coupling is established by 126 * calling a `LinkRequest` to an address. This will cause the runtime to send 127 * an `ExitMsg` if either this or other dies. Per default, actors terminate 128 * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal. 129 * This mechanism propagates failure states in an actor system. Linked actors 130 * form a sub system in which an error causes all actors to fail collectively. 131 */ 132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow; 133 134 /// An exception has been thrown while processing a message. 135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow; 136 137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest` 138 * to an address. This will cause the runtime system to send a `DownMsg` for 139 * other if it dies. 140 * 141 * Actors drop down messages unless they provide a custom handler. 142 */ 143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow; 144 145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow { 146 } 147 148 /// Write the name of the actor and the message type to the console. 149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow { 150 import std.stdio : writeln; 151 152 try { 153 writeln("UNKNOWN message sent to actor ", self.name); 154 writeln(msg.toString); 155 } catch (Exception e) { 156 } 157 } 158 159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow { 160 self.lastError = msg.reason; 161 self.shutdown; 162 } 163 164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow { 165 self.lastError = msg.reason; 166 self.forceShutdown; 167 } 168 169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow { 170 self.lastError = SystemError.runtimeError; 171 // TODO: should log? 172 self.forceShutdown; 173 } 174 175 // Write the name of the actor and the exception to stdout. 176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow { 177 import std.stdio : writeln; 178 179 self.lastError = SystemError.runtimeError; 180 181 try { 182 writeln("EXCEPTION thrown by actor ", self.name); 183 writeln(e.msg); 184 writeln("TERMINATING"); 185 } catch (Exception e) { 186 } 187 188 self.forceShutdown; 189 } 190 191 /// Timeout for an outstanding request. 192 struct ReplyHandlerTimeout { 193 ulong id; 194 SysTime timeout; 195 } 196 197 package enum ActorState { 198 /// waiting to be started. 199 waiting, 200 /// active and processing messages. 201 active, 202 /// wait for all awaited responses to finish 203 shutdown, 204 /// discard also the awaite responses, just shutdown fast 205 forceShutdown, 206 /// in process of shutting down 207 finishShutdown, 208 /// stopped. 209 stopped, 210 } 211 212 private struct AwaitReponse { 213 Closure!(ReplyHandler, void*) behavior; 214 ErrorHandler onError; 215 } 216 217 struct Actor { 218 import std.container.rbtree : RedBlackTree, redBlackTree; 219 220 package StrongAddress addr; 221 // visible in the package for logging purpose. 222 package ActorState state_ = ActorState.stopped; 223 224 private { 225 // TODO: rename to behavior. 226 Closure!(MsgHandler, void*)[ulong] incoming; 227 Closure!(RequestHandler, void*)[ulong] reqBehavior; 228 229 // callbacks for awaited responses key:ed on their id. 230 AwaitReponse[ulong] awaitedResponses; 231 ReplyHandlerTimeout[] replyTimeouts; 232 233 // important that it start at 1 because then zero is known to not be initialized. 234 ulong nextReplyId = 1; 235 236 /// Delayed messages ordered by their trigger time. 237 RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed; 238 239 /// Used during shutdown to signal monitors and links why this actor is terminating. 240 SystemError lastError; 241 242 /// monitoring the actor lifetime. 243 WeakAddress[size_t] monitors; 244 245 /// strong, bidirectional link of the actors lifetime. 246 WeakAddress[size_t] links; 247 248 // Number of messages that has been processed. 249 ulong messages_; 250 251 /// System the actor belongs to. 252 System* homeSystem_; 253 254 string name_; 255 256 ErrorHandler errorHandler_; 257 258 /// callback when a link goes down. 259 DownHandler downHandler_; 260 261 ExitHandler exitHandler_; 262 263 ExceptionHandler exceptionHandler_; 264 265 DefaultHandler defaultHandler_; 266 } 267 268 invariant () { 269 if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) { 270 assert(errorHandler_); 271 assert(exitHandler_); 272 assert(exceptionHandler_); 273 assert(defaultHandler_); 274 } 275 } 276 277 this(StrongAddress a) @trusted 278 in (!a.empty, "address is empty") { 279 state_ = ActorState.waiting; 280 281 addr = a; 282 addr.get.setOpen; 283 delayed = new typeof(delayed); 284 285 errorHandler_ = toDelegate(&defaultErrorHandler); 286 downHandler_ = null; 287 exitHandler_ = toDelegate(&defaultExitHandler); 288 exceptionHandler_ = toDelegate(&defaultExceptionHandler); 289 defaultHandler_ = toDelegate(&.defaultHandler); 290 } 291 292 WeakAddress address() @safe { 293 return addr.weakRef; 294 } 295 296 package ref StrongAddress addressRef() return @safe pure nothrow @nogc { 297 return addr; 298 } 299 300 ref System homeSystem() @safe pure nothrow @nogc { 301 return *homeSystem_; 302 } 303 304 /** Clean shutdown of the actor 305 * 306 * Stopping incoming messages from triggering new behavior and finish all 307 * awaited respones. 308 */ 309 void shutdown() @safe nothrow { 310 if (state_.among(ActorState.waiting, ActorState.active)) 311 state_ = ActorState.shutdown; 312 } 313 314 /** Force an immediate shutdown. 315 * 316 * Stopping incoming messages from triggering new behavior and finish all 317 * awaited respones. 318 */ 319 void forceShutdown() @safe nothrow { 320 if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown)) 321 state_ = ActorState.forceShutdown; 322 } 323 324 ulong id() @safe pure nothrow const @nogc { 325 return addr.id; 326 } 327 328 /// Returns: the name of the actor. 329 string name() @safe pure nothrow const @nogc { 330 return name_; 331 } 332 333 // dfmt off 334 335 /// Set name name of the actor. 336 void name(string n) @safe pure nothrow @nogc { 337 this.name_ = n; 338 } 339 340 void errorHandler(ErrorHandler v) @safe pure nothrow @nogc { 341 errorHandler_ = v; 342 } 343 344 void downHandler(DownHandler v) @safe pure nothrow @nogc { 345 downHandler_ = v; 346 } 347 348 void exitHandler(ExitHandler v) @safe pure nothrow @nogc { 349 exitHandler_ = v; 350 } 351 352 void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc { 353 exceptionHandler_ = v; 354 } 355 356 void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc { 357 defaultHandler_ = v; 358 } 359 360 // dfmt on 361 362 package: 363 bool hasMessage() @safe pure nothrow @nogc { 364 return addr && addr.get.hasMessage; 365 } 366 367 /// How long until a delayed message or a timeout fires. 368 Duration nextTimeout(const SysTime now, const Duration default_) @safe { 369 return min(delayed.empty ? default_ : (delayed.front.triggerAt - now), 370 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now)); 371 } 372 373 bool waitingForReply() @safe pure nothrow const @nogc { 374 return !awaitedResponses.empty; 375 } 376 377 /// Number of messages that has been processed. 378 ulong messages() @safe pure nothrow const @nogc { 379 return messages_; 380 } 381 382 void setHomeSystem(System* sys) @safe pure nothrow @nogc { 383 homeSystem_ = sys; 384 } 385 386 void cleanupBehavior() @trusted nothrow { 387 foreach (ref a; incoming.byValue) { 388 try { 389 a.free; 390 } catch (Exception e) { 391 // TODO: call exceptionHandler? 392 } 393 } 394 incoming = null; 395 foreach (ref a; reqBehavior.byValue) { 396 try { 397 a.free; 398 } catch (Exception e) { 399 } 400 } 401 reqBehavior = null; 402 } 403 404 void cleanupAwait() @trusted nothrow { 405 foreach (ref a; awaitedResponses.byValue) { 406 try { 407 a.behavior.free; 408 } catch (Exception e) { 409 } 410 } 411 awaitedResponses = null; 412 } 413 414 void cleanupDelayed() @trusted nothrow { 415 foreach (const _; 0 .. delayed.length) { 416 try { 417 delayed.front.msg = Msg.init; 418 delayed.removeFront; 419 } catch (Exception e) { 420 } 421 } 422 .destroy(delayed); 423 } 424 425 bool isAlive() @safe pure nothrow const @nogc { 426 final switch (state_) { 427 case ActorState.waiting: 428 goto case; 429 case ActorState.active: 430 goto case; 431 case ActorState.shutdown: 432 goto case; 433 case ActorState.forceShutdown: 434 goto case; 435 case ActorState.finishShutdown: 436 return true; 437 case ActorState.stopped: 438 return false; 439 } 440 } 441 442 /// Accepting messages. 443 bool isAccepting() @safe pure nothrow const @nogc { 444 final switch (state_) { 445 case ActorState.waiting: 446 goto case; 447 case ActorState.active: 448 goto case; 449 case ActorState.shutdown: 450 return true; 451 case ActorState.forceShutdown: 452 goto case; 453 case ActorState.finishShutdown: 454 goto case; 455 case ActorState.stopped: 456 return false; 457 } 458 } 459 460 ulong replyId() @safe { 461 return nextReplyId++; 462 } 463 464 void process(const SysTime now) @safe nothrow { 465 import core.memory : GC; 466 467 assert(!GC.inFinalizer); 468 469 messages_ = 0; 470 471 void tick() { 472 // philosophy of the order is that a timeout should only trigger if it 473 // is really required thus it is checked last. This order then mean 474 // that a request may have triggered a timeout but because 475 // `processReply` is called before `checkReplyTimeout` it is *ignored*. 476 // Thus "better to accept even if it is timeout rather than fail". 477 // 478 // NOTE: the assumption that a message that has timed out should be 479 // processed turned out to be... wrong. It is annoying that 480 // sometimes a timeout message triggers even though it shouldn't, 481 // because it is now too old to be useful! 482 // Thus the order is changed to first check for timeout, then process. 483 try { 484 processSystemMsg(); 485 checkReplyTimeout(now); 486 processDelayed(now); 487 processIncoming(); 488 processReply(); 489 } catch (Exception e) { 490 exceptionHandler_(this, e); 491 } 492 } 493 494 assert(state_ == ActorState.stopped || addr, "no address"); 495 496 final switch (state_) { 497 case ActorState.waiting: 498 state_ = ActorState.active; 499 tick; 500 // the state can be changed before the actor have executed. 501 break; 502 case ActorState.active: 503 tick; 504 // self terminate if the actor has no behavior. 505 if (incoming.empty && awaitedResponses.empty && reqBehavior.empty) 506 state_ = ActorState.forceShutdown; 507 break; 508 case ActorState.shutdown: 509 tick; 510 if (awaitedResponses.empty) 511 state_ = ActorState.finishShutdown; 512 cleanupBehavior; 513 break; 514 case ActorState.forceShutdown: 515 state_ = ActorState.finishShutdown; 516 cleanupBehavior; 517 addr.get.setClosed; 518 break; 519 case ActorState.finishShutdown: 520 state_ = ActorState.stopped; 521 522 sendToMonitors(DownMsg(addr.weakRef, lastError)); 523 524 sendToLinks(ExitMsg(addr.weakRef, lastError)); 525 526 replyTimeouts = null; 527 cleanupDelayed; 528 cleanupAwait; 529 530 // must be last because sendToLinks and sendToMonitors uses addr. 531 addr.get.shutdown(); 532 addr.release; 533 break; 534 case ActorState.stopped: 535 break; 536 } 537 } 538 539 void sendToMonitors(DownMsg msg) @safe nothrow { 540 foreach (ref a; monitors.byValue) { 541 try { 542 auto tmp = a.lock; 543 auto rc = tmp.get; 544 if (rc) 545 rc.put(SystemMsg(msg)); 546 a.release; 547 } catch (Exception e) { 548 } 549 } 550 551 monitors = null; 552 } 553 554 void sendToLinks(ExitMsg msg) @safe nothrow { 555 foreach (ref a; links.byValue) { 556 try { 557 auto tmp = a.lock; 558 auto rc = tmp.get; 559 if (rc) 560 rc.put(SystemMsg(msg)); 561 a.release; 562 } catch (Exception e) { 563 } 564 } 565 566 links = null; 567 } 568 569 void checkReplyTimeout(const SysTime now) @safe { 570 if (replyTimeouts.empty) 571 return; 572 573 size_t removeTo; 574 foreach (const i; 0 .. replyTimeouts.length) { 575 if (now > replyTimeouts[i].timeout) { 576 const id = replyTimeouts[i].id; 577 if (auto v = id in awaitedResponses) { 578 messages_++; 579 v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout)); 580 try { 581 () @trusted { v.behavior.free; }(); 582 } catch (Exception e) { 583 } 584 awaitedResponses.remove(id); 585 } 586 removeTo = i + 1; 587 } else { 588 break; 589 } 590 } 591 592 if (removeTo >= replyTimeouts.length) { 593 replyTimeouts = null; 594 } else if (removeTo != 0) { 595 replyTimeouts = replyTimeouts[removeTo .. $]; 596 } 597 } 598 599 void processIncoming() @safe { 600 if (addr.get.empty!Msg) 601 return; 602 messages_++; 603 604 auto front = addr.get.pop!Msg; 605 scope (exit) 606 .destroy(front); 607 608 void doSend(ref MsgOneShot msg) { 609 if (auto v = front.get.signature in incoming) { 610 (*v)(msg.data); 611 } else { 612 defaultHandler_(this, msg.data); 613 } 614 } 615 616 void doRequest(ref MsgRequest msg) @trusted { 617 if (auto v = front.get.signature in reqBehavior) { 618 (*v)(msg.data, msg.replyId, msg.replyTo); 619 } else { 620 defaultHandler_(this, msg.data); 621 } 622 } 623 624 front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) { 625 doRequest(a); 626 }); 627 } 628 629 /** All system messages are handled. 630 * 631 * Assuming: 632 * * they are not heavy to process 633 * * very important that if there are any they should be handled as soon as possible 634 * * ignoring the case when there is a "storm" of system messages which 635 * "could" overload the actor system and lead to a crash. I classify this, 636 * for now, as intentional, malicious coding by the developer themself. 637 * External inputs that could trigger such a behavior should be controlled 638 * and limited. Other types of input such as a developer trying to break 639 * the actor system is out of scope. 640 */ 641 void processSystemMsg() @safe { 642 //() @trusted { 643 //logger.infof("run %X", cast(void*) &this); 644 //}(); 645 while (!addr.get.empty!SystemMsg) { 646 messages_++; 647 //logger.infof("%X %s %s", addr.toHash, state_, messages_); 648 auto front = addr.get.pop!SystemMsg; 649 scope (exit) 650 .destroy(front); 651 652 front.get.match!((ref DownMsg a) { 653 if (downHandler_) 654 downHandler_(this, a); 655 }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) { 656 if (auto v = a.addr.toHash in monitors) 657 v.release; 658 monitors.remove(a.addr.toHash); 659 }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) { 660 if (auto v = a.addr.toHash in links) 661 v.release; 662 links.remove(a.addr.toHash); 663 }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) { 664 exitHandler_(this, a); 665 }, (ref SystemExitMsg a) { 666 final switch (a.reason) { 667 case ExitReason.normal: 668 break; 669 case ExitReason.unhandledException: 670 exitHandler_(this, ExitMsg.init); 671 break; 672 case ExitReason.unknown: 673 exitHandler_(this, ExitMsg.init); 674 break; 675 case ExitReason.userShutdown: 676 exitHandler_(this, ExitMsg.init); 677 break; 678 case ExitReason.kill: 679 exitHandler_(this, ExitMsg.init); 680 // the user do NOT have an option here 681 forceShutdown; 682 break; 683 } 684 }); 685 } 686 } 687 688 void processReply() @safe { 689 if (addr.get.empty!Reply) 690 return; 691 messages_++; 692 693 auto front = addr.get.pop!Reply; 694 scope (exit) 695 .destroy(front); 696 697 if (auto v = front.get.id in awaitedResponses) { 698 // TODO: reduce the lookups on front.id 699 v.behavior(front.get.data); 700 try { 701 () @trusted { v.behavior.free; }(); 702 } catch (Exception e) { 703 } 704 awaitedResponses.remove(front.get.id); 705 removeReplyTimeout(front.get.id); 706 } else { 707 // TODO: should probably be SystemError.unexpectedResponse? 708 defaultHandler_(this, front.get.data); 709 } 710 } 711 712 void processDelayed(const SysTime now) @trusted { 713 if (!addr.get.empty!DelayedMsg) { 714 // count as a message because handling them are "expensive". 715 // Ignoring the case that the message right away is moved to the 716 // incoming queue. This lead to "double accounting" but ohh well. 717 // Don't use delayedSend when you should have used send. 718 messages_++; 719 delayed.insert(addr.get.pop!DelayedMsg.unsafeMove); 720 } else if (delayed.empty) { 721 return; 722 } 723 724 foreach (const i; 0 .. delayed.length) { 725 if (now > delayed.front.triggerAt) { 726 addr.get.put(delayed.front.msg); 727 delayed.removeFront; 728 } else { 729 break; 730 } 731 } 732 } 733 734 private void removeReplyTimeout(ulong id) @safe nothrow { 735 import std.algorithm : remove; 736 737 foreach (const i; 0 .. replyTimeouts.length) { 738 if (replyTimeouts[i].id == id) { 739 remove(replyTimeouts, i); 740 break; 741 } 742 } 743 } 744 745 void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted { 746 if (!isAccepting) 747 return; 748 749 if (auto v = signature in incoming) { 750 try { 751 v.free; 752 } catch (Exception e) { 753 } 754 } 755 incoming[signature] = handler; 756 } 757 758 void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted { 759 if (!isAccepting) 760 return; 761 762 if (auto v = signature in reqBehavior) { 763 try { 764 v.free; 765 } catch (Exception e) { 766 } 767 } 768 reqBehavior[signature] = handler; 769 } 770 771 void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler, 772 void*) reply, ErrorHandler onError) @safe { 773 if (!isAccepting) 774 return; 775 776 awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError); 777 replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout); 778 schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts); 779 } 780 } 781 782 struct Closure(Fn, CtxT) { 783 alias FreeFn = void function(CtxT); 784 785 Fn fn; 786 CtxT ctx; 787 FreeFn cleanup; 788 789 this(Fn fn) { 790 this.fn = fn; 791 } 792 793 this(Fn fn, CtxT* ctx, FreeFn cleanup) { 794 this.fn = fn; 795 this.ctx = ctx; 796 this.cleanup = cleanup; 797 } 798 799 void opCall(Args...)(auto ref Args args) { 800 assert(fn !is null); 801 fn(ctx, args); 802 } 803 804 void free() { 805 // will crash, on purpuse, if there is a ctx and no cleanup registered. 806 // maybe a bad idea? dunno... lets see 807 if (ctx) 808 cleanup(ctx); 809 ctx = CtxT.init; 810 } 811 } 812 813 @("shall register a behavior to be called when msg received matching signature") 814 unittest { 815 auto addr = makeAddress2; 816 auto actor = Actor(addr); 817 818 bool processedIncoming; 819 void fn(void* ctx, ref Variant msg) { 820 processedIncoming = true; 821 } 822 823 actor.register(1, Closure!(MsgHandler, void*)(&fn)); 824 addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42))))); 825 826 actor.process(Clock.currTime); 827 828 assert(processedIncoming); 829 } 830 831 private void cleanupCtx(CtxT)(void* ctx) 832 if (is(CtxT == Tuple!T, T) || is(CtxT == void)) { 833 import std.traits; 834 import my.actor.typed; 835 836 static if (!is(CtxT == void)) { 837 // trust that any use of this also pass on the correct context type. 838 auto userCtx = () @trusted { return cast(CtxT*) ctx; }(); 839 // release the context such as if it holds a rc object. 840 alias Types = CtxT.Types; 841 842 static foreach (const i; 0 .. CtxT.Types.length) { 843 { 844 alias T = CtxT.Types[i]; 845 alias UT = Unqual!T; 846 static if (!is(T == UT)) { 847 static assert(!is(UT : WeakAddress), 848 "WeakAddress must NEVER be const or immutable"); 849 static assert(!is(UT : TypedAddress!M, M...), 850 "WeakAddress must NEVER be const or immutable: " ~ T.stringof); 851 } 852 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit? 853 854 static if (is(UT == T)) { 855 .destroy((*userCtx)[i]); 856 } 857 } 858 } 859 } 860 } 861 862 @("shall default initialize when possible, skipping const/immutable") 863 unittest { 864 { 865 auto x = tuple(cast(const) 42, 43); 866 alias T = typeof(x); 867 cleanupCtx!T(cast(void*)&x); 868 assert(x[0] == 42); // can't assign to const 869 assert(x[1] == 0); 870 } 871 872 { 873 import my.path : Path; 874 875 auto x = tuple(Path.init, cast(const) Path("foo")); 876 alias T = typeof(x); 877 cleanupCtx!T(cast(void*)&x); 878 assert(x[0] == Path.init); 879 assert(x[1] == Path("foo")); 880 } 881 } 882 883 package struct Action { 884 Closure!(MsgHandler, void*) action; 885 ulong signature; 886 } 887 888 /// An behavior for an actor when it receive a message of `signature`. 889 package auto makeAction(T, CtxT = void)(T handler) @safe 890 if (isFunction!T || isFunctionPointer!T) { 891 static if (is(CtxT == void)) 892 alias Params = Parameters!T; 893 else { 894 alias CtxParam = Parameters!T[0]; 895 alias Params = Parameters!T[1 .. $]; 896 checkMatchingCtx!(CtxParam, CtxT); 897 checkRefForContext!handler; 898 } 899 900 alias HArgs = staticMap!(Unqual, Params); 901 902 void fn(void* ctx, ref Variant msg) @trusted { 903 static if (is(CtxT == void)) { 904 handler(msg.get!(Tuple!HArgs).expand); 905 } else { 906 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 907 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 908 } 909 } 910 911 return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 912 } 913 914 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe { 915 static if (is(CtxT == void)) 916 alias Params = Parameters!T; 917 else { 918 alias CtxParam = Parameters!T[0]; 919 alias Params = Parameters!T[1 .. $]; 920 checkMatchingCtx!(CtxParam, CtxT); 921 checkRefForContext!handler; 922 } 923 924 alias HArgs = staticMap!(Unqual, Params); 925 926 void fn(void* ctx, ref Variant msg) @trusted { 927 static if (is(CtxT == void)) { 928 handler(msg.get!(Tuple!HArgs).expand); 929 } else { 930 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 931 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 932 } 933 } 934 935 return typeof(return)(&fn, null, &cleanupCtx!CtxT); 936 } 937 938 package struct Request { 939 Closure!(RequestHandler, void*) request; 940 ulong signature; 941 } 942 943 private string locToString(Loc...)() { 944 import std.conv : to; 945 946 return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string; 947 } 948 949 /// Check that the context parameter is `ref` otherwise issue a warning. 950 package void checkRefForContext(alias handler)() { 951 import std.traits : ParameterStorageClass, ParameterStorageClassTuple; 952 953 alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0]; 954 955 static if (CtxParam != ParameterStorageClass.ref_) { 956 pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof); 957 static assert(CtxParam == ParameterStorageClass.ref_, 958 "The context must be `ref` to avoid unnecessary copying"); 959 } 960 } 961 962 package void checkMatchingCtx(CtxParam, CtxT)() { 963 static if (!is(CtxT == CtxParam)) { 964 static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }), 965 "mismatch between the context type " ~ CtxT.stringof 966 ~ " and the first parameter " ~ CtxParam.stringof); 967 } 968 } 969 970 package auto makeRequest(T, CtxT = void)(T handler) @safe { 971 static assert(!is(ReturnType!T == void), "handler returns void, not allowed"); 972 973 alias RType = ReturnType!T; 974 enum isReqResult = is(RType : RequestResult!ReqT, ReqT); 975 enum isPromise = is(RType : Promise!PromT, PromT); 976 977 static if (is(CtxT == void)) 978 alias Params = Parameters!T; 979 else { 980 alias CtxParam = Parameters!T[0]; 981 alias Params = Parameters!T[1 .. $]; 982 checkMatchingCtx!(CtxParam, CtxT); 983 checkRefForContext!handler; 984 } 985 986 alias HArgs = staticMap!(Unqual, Params); 987 988 void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted { 989 static if (is(CtxT == void)) { 990 auto r = handler(msg.get!(Tuple!HArgs).expand); 991 } else { 992 auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx; 993 auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand); 994 } 995 996 static if (isReqResult) { 997 r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) { 998 assert(!a.data.empty, "the promise MUST be constructed before it is returned"); 999 a.data.get.replyId = replyId; 1000 a.data.get.replyTo = replyTo; 1001 }, (data) { 1002 enum wrapInTuple = !is(typeof(data) : Tuple!U, U); 1003 if (auto rc = replyTo.lock.get) { 1004 static if (wrapInTuple) 1005 rc.put(Reply(replyId, Variant(tuple(data)))); 1006 else 1007 rc.put(Reply(replyId, Variant(data))); 1008 } 1009 }); 1010 } else static if (isPromise) { 1011 r.data.get.replyId = replyId; 1012 r.data.get.replyTo = replyTo; 1013 } else { 1014 // TODO: is this syntax for U one variable or variable. I want it to be variable. 1015 enum wrapInTuple = !is(RType : Tuple!U, U); 1016 if (auto rc = replyTo.lock.get) { 1017 static if (wrapInTuple) 1018 rc.put(Reply(replyId, Variant(tuple(r)))); 1019 else 1020 rc.put(Reply(replyId, Variant(r))); 1021 } 1022 } 1023 } 1024 1025 return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 1026 } 1027 1028 @("shall link two actors lifetime") 1029 unittest { 1030 int count; 1031 void countExits(ref Actor self, ExitMsg msg) @safe nothrow { 1032 count++; 1033 self.shutdown; 1034 } 1035 1036 auto aa1 = Actor(makeAddress2); 1037 auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize; 1038 auto aa2 = Actor(makeAddress2); 1039 auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize; 1040 1041 a1.linkTo(a2.address); 1042 a1.process(Clock.currTime); 1043 a2.process(Clock.currTime); 1044 1045 assert(a1.isAlive); 1046 assert(a2.isAlive); 1047 1048 sendExit(a1.address, ExitReason.userShutdown); 1049 foreach (_; 0 .. 5) { 1050 a1.process(Clock.currTime); 1051 a2.process(Clock.currTime); 1052 } 1053 1054 assert(!a1.isAlive); 1055 assert(!a2.isAlive); 1056 assert(count == 2); 1057 } 1058 1059 @("shall let one actor monitor the lifetime of the other one") 1060 unittest { 1061 int count; 1062 void downMsg(ref Actor self, DownMsg msg) @safe nothrow { 1063 count++; 1064 } 1065 1066 auto aa1 = Actor(makeAddress2); 1067 auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize; 1068 auto aa2 = Actor(makeAddress2); 1069 auto a2 = build(&aa2).set((int x) {}).finalize; 1070 1071 a1.monitor(a2.address); 1072 a1.process(Clock.currTime); 1073 a2.process(Clock.currTime); 1074 1075 assert(a1.isAlive); 1076 assert(a2.isAlive); 1077 1078 sendExit(a2.address, ExitReason.userShutdown); 1079 foreach (_; 0 .. 5) { 1080 a1.process(Clock.currTime); 1081 a2.process(Clock.currTime); 1082 } 1083 1084 assert(a1.isAlive); 1085 assert(!a2.isAlive); 1086 assert(count == 1); 1087 } 1088 1089 private struct BuildActor { 1090 Actor* actor; 1091 1092 Actor* finalize() @safe { 1093 auto rval = actor; 1094 actor = null; 1095 return rval; 1096 } 1097 1098 auto errorHandler(ErrorHandler a) { 1099 actor.errorHandler = a; 1100 return this; 1101 } 1102 1103 auto downHandler_(DownHandler a) { 1104 actor.downHandler_ = a; 1105 return this; 1106 } 1107 1108 auto exitHandler_(ExitHandler a) { 1109 actor.exitHandler_ = a; 1110 return this; 1111 } 1112 1113 auto exceptionHandler_(ExceptionHandler a) { 1114 actor.exceptionHandler_ = a; 1115 return this; 1116 } 1117 1118 auto defaultHandler_(DefaultHandler a) { 1119 actor.defaultHandler_ = a; 1120 return this; 1121 } 1122 1123 auto set(BehaviorT)(BehaviorT behavior) 1124 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1125 && !is(ReturnType!BehaviorT == void)) { 1126 auto act = makeRequest(behavior); 1127 actor.register(act.signature, act.request); 1128 return this; 1129 } 1130 1131 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1132 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1133 && !is(ReturnType!BehaviorT == void)) { 1134 auto act = makeRequest!(BehaviorT, CT)(behavior); 1135 // for now just use the GC to allocate the context on. 1136 // TODO: use an allocator. 1137 act.request.ctx = cast(void*) new CT(c); 1138 actor.register(act.signature, act.request); 1139 return this; 1140 } 1141 1142 auto set(BehaviorT)(BehaviorT behavior) 1143 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1144 && is(ReturnType!BehaviorT == void)) { 1145 auto act = makeAction(behavior); 1146 actor.register(act.signature, act.action); 1147 return this; 1148 } 1149 1150 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1151 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1152 && is(ReturnType!BehaviorT == void)) { 1153 auto act = makeAction!(BehaviorT, CT)(behavior); 1154 // for now just use the GC to allocate the context on. 1155 // TODO: use an allocator. 1156 act.action.ctx = cast(void*) new CT(c); 1157 actor.register(act.signature, act.action); 1158 return this; 1159 } 1160 } 1161 1162 package BuildActor build(Actor* a) @safe { 1163 return BuildActor(a); 1164 } 1165 1166 /// Implement an actor. 1167 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) { 1168 import my.actor.msg : isCapture, Capture; 1169 1170 auto bactor = build(self); 1171 static foreach (const i; 0 .. Behavior.length) { 1172 { 1173 alias b = Behavior[i]; 1174 1175 static if (!isCapture!b) { 1176 static if (!(isFunction!(b) || isFunctionPointer!(b))) 1177 static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof); 1178 1179 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) { 1180 bactor.set(behaviors[i], behaviors[i + 1]); 1181 } else 1182 bactor.set(behaviors[i]); 1183 } 1184 } 1185 } 1186 1187 return bactor.finalize; 1188 } 1189 1190 @("build dynamic actor from functions") 1191 unittest { 1192 static void fn3(int s) @safe { 1193 } 1194 1195 static string fn4(int s) @safe { 1196 return "foo"; 1197 } 1198 1199 static Tuple!(int, string) fn5(const string s) @safe { 1200 return typeof(return)(42, "hej"); 1201 } 1202 1203 auto aa1 = Actor(makeAddress2); 1204 auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize; 1205 } 1206 1207 unittest { 1208 bool delayOk; 1209 static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe { 1210 *c.delayOk = true; 1211 } 1212 1213 bool delayShouldNeverHappen; 1214 static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe { 1215 *c.delayShouldNeverHappen = true; 1216 } 1217 1218 auto aa1 = Actor(makeAddress2); 1219 auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2, 1220 capture(&delayShouldNeverHappen)).finalize; 1221 delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo"); 1222 delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42); 1223 1224 assert(!actor.addressRef.get.empty!DelayedMsg); 1225 assert(actor.addressRef.get.empty!Msg); 1226 assert(actor.addressRef.get.empty!Reply); 1227 1228 actor.process(Clock.currTime); 1229 1230 assert(!actor.addressRef.get.empty!DelayedMsg); 1231 assert(actor.addressRef.get.empty!Msg); 1232 assert(actor.addressRef.get.empty!Reply); 1233 1234 actor.process(Clock.currTime); 1235 actor.process(Clock.currTime); 1236 1237 assert(actor.addressRef.get.empty!DelayedMsg); 1238 assert(actor.addressRef.get.empty!Msg); 1239 assert(actor.addressRef.get.empty!Reply); 1240 1241 assert(delayOk); 1242 assert(!delayShouldNeverHappen); 1243 } 1244 1245 @("shall process a request->then chain xyz") 1246 @system unittest { 1247 // checking capture is correctly setup/teardown by using captured rc. 1248 1249 auto rcReq = refCounted(42); 1250 bool calledOk; 1251 static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s, 1252 const string b) { 1253 assert(2 == ctx[1].refCount); 1254 if (s == "apa") 1255 *ctx.calledOk = true; 1256 return "foo"; 1257 } 1258 1259 auto rcReply = refCounted(42); 1260 bool calledReply; 1261 static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) { 1262 *ctx[0] = s == "foo"; 1263 assert(2 == ctx[1].refCount); 1264 } 1265 1266 auto aa1 = Actor(makeAddress2); 1267 auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize; 1268 1269 assert(2 == rcReq.refCount); 1270 assert(1 == rcReply.refCount); 1271 1272 actor.request(actor.address, infTimeout).send("apa", "foo") 1273 .capture(&calledReply, rcReply).then(&reply); 1274 assert(2 == rcReply.refCount); 1275 1276 assert(!actor.addr.get.empty!Msg); 1277 assert(actor.addr.get.empty!Reply); 1278 1279 actor.process(Clock.currTime); 1280 assert(actor.addr.get.empty!Msg); 1281 assert(actor.addr.get.empty!Reply); 1282 1283 assert(2 == rcReq.refCount); 1284 assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back"); 1285 1286 assert(calledOk); 1287 assert(calledReply); 1288 1289 actor.shutdown; 1290 while (actor.isAlive) 1291 actor.process(Clock.currTime); 1292 } 1293 1294 @("shall process a request->then chain using promises") 1295 unittest { 1296 static struct A { 1297 string v; 1298 } 1299 1300 static struct B { 1301 string v; 1302 } 1303 1304 int calledOk; 1305 auto fn1p = makePromise!string; 1306 static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted { 1307 if (a.v == "apa") 1308 (*c.calledOk)++; 1309 return typeof(return)(c.p); 1310 } 1311 1312 auto fn2p = makePromise!string; 1313 static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) { 1314 (*c.calledOk)++; 1315 return c.p; 1316 } 1317 1318 int calledReply; 1319 static void reply(ref Tuple!(int*) ctx, const string s) { 1320 if (s == "foo") 1321 *ctx[0] += 1; 1322 } 1323 1324 auto aa1 = Actor(makeAddress2); 1325 auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2, 1326 capture(&calledOk, fn2p)).finalize; 1327 1328 actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply); 1329 actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply); 1330 1331 actor.process(Clock.currTime); 1332 assert(calledOk == 1); // first request 1333 assert(calledReply == 0); 1334 1335 fn1p.deliver("foo"); 1336 1337 assert(calledReply == 0); 1338 1339 actor.process(Clock.currTime); 1340 assert(calledOk == 2); // second request triggered 1341 assert(calledReply == 1); 1342 1343 fn2p.deliver("foo"); 1344 actor.process(Clock.currTime); 1345 1346 assert(calledReply == 2); 1347 1348 actor.shutdown; 1349 while (actor.isAlive) { 1350 actor.process(Clock.currTime); 1351 } 1352 } 1353 1354 /// The timeout triggered. 1355 class ScopedActorException : Exception { 1356 this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow { 1357 super(null, file, line); 1358 error = err; 1359 } 1360 1361 ScopedActorError error; 1362 } 1363 1364 enum ScopedActorError : ubyte { 1365 none, 1366 // actor address is down 1367 down, 1368 // request timeout 1369 timeout, 1370 // the address where unable to process the received message 1371 unknownMsg, 1372 // some type of fatal error occured. 1373 fatal, 1374 } 1375 1376 /** Intended to be used in a local scope by a user. 1377 * 1378 * `ScopedActor` is not thread safe. 1379 */ 1380 struct ScopedActor { 1381 import my.actor.typed : underlyingAddress, underlyingWeakAddress; 1382 1383 private { 1384 static struct Data { 1385 Actor self; 1386 ScopedActorError errSt; 1387 1388 ~this() @safe { 1389 if (self.addr.empty) 1390 return; 1391 1392 () @trusted { 1393 self.downHandler = null; 1394 self.defaultHandler = toDelegate(&.defaultHandler); 1395 self.errorHandler = toDelegate(&defaultErrorHandler); 1396 }(); 1397 1398 self.shutdown; 1399 while (self.isAlive) { 1400 self.process(Clock.currTime); 1401 } 1402 } 1403 } 1404 1405 RefCounted!Data data; 1406 } 1407 1408 this(StrongAddress addr, string name) @safe { 1409 data = refCounted(Data(Actor(addr))); 1410 data.get.self.name = name; 1411 } 1412 1413 private void reset() @safe nothrow { 1414 data.get.errSt = ScopedActorError.none; 1415 } 1416 1417 SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout) 1418 if (isAddress!TAddress) { 1419 reset; 1420 auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout); 1421 return SRequestSend(rs, this); 1422 } 1423 1424 private static struct SRequestSend { 1425 RequestSend rs; 1426 ScopedActor self; 1427 1428 /// Copy constructor 1429 this(ref return typeof(this) rhs) @safe pure nothrow @nogc { 1430 rs = rhs.rs; 1431 self = rhs.self; 1432 } 1433 1434 @disable this(this); 1435 1436 SRequestSendThen send(Args...)(auto ref Args args) { 1437 return SRequestSendThen(.send(rs, args), self); 1438 } 1439 } 1440 1441 private static struct SRequestSendThen { 1442 RequestSendThen rs; 1443 ScopedActor self; 1444 uint backoff; 1445 1446 /// Copy constructor 1447 this(ref return typeof(this) rhs) { 1448 rs = rhs.rs; 1449 self = rhs.self; 1450 backoff = rhs.backoff; 1451 } 1452 1453 @disable this(this); 1454 1455 void dynIntervalSleep() @trusted { 1456 // +100 usecs "feels good", magic number. current OS and 1457 // implementation of message passing isn't that much faster than 1458 // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They 1459 // aren't expected to be used for "time critical" sections. 1460 Thread.sleep(backoff.dur!"usecs"); 1461 backoff = min(backoff + 100, 20000); 1462 } 1463 1464 private static struct ValueCapture { 1465 RefCounted!Data data; 1466 1467 void downHandler(ref Actor, DownMsg) @safe nothrow { 1468 data.get.errSt = ScopedActorError.down; 1469 } 1470 1471 void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow { 1472 if (msg.reason == SystemError.requestTimeout) 1473 data.get.errSt = ScopedActorError.timeout; 1474 else 1475 data.get.errSt = ScopedActorError.fatal; 1476 } 1477 1478 void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow { 1479 logAndDropHandler(a, msg); 1480 data.get.errSt = ScopedActorError.unknownMsg; 1481 } 1482 } 1483 1484 void then(T)(T handler, ErrorHandler onError = null) { 1485 scope (exit) 1486 demonitor(rs.rs.self, rs.rs.requestTo); 1487 monitor(rs.rs.self, rs.rs.requestTo); 1488 1489 auto callback = new ValueCapture(self.data); 1490 self.data.get.self.downHandler = &callback.downHandler; 1491 self.data.get.self.defaultHandler = &callback.unknownMsgHandler; 1492 self.data.get.self.errorHandler = &callback.errorHandler; 1493 1494 () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }(); 1495 1496 scope (exit) 1497 () @trusted { 1498 self.data.get.self.downHandler = null; 1499 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler); 1500 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler); 1501 }(); 1502 1503 auto requestTo = rs.rs.requestTo.lock; 1504 if (!requestTo) 1505 throw new ScopedActorException(ScopedActorError.down); 1506 1507 // TODO: this loop is stupid... should use a conditional variable 1508 // instead but that requires changing the mailbox. later 1509 do { 1510 rs.rs.self.process(Clock.currTime); 1511 // force the actor to be alive even though there are no behaviors. 1512 rs.rs.self.state_ = ActorState.waiting; 1513 1514 if (self.data.get.errSt == ScopedActorError.none) { 1515 dynIntervalSleep; 1516 } else { 1517 throw new ScopedActorException(self.data.get.errSt); 1518 } 1519 1520 } 1521 while (self.data.get.self.waitingForReply); 1522 } 1523 } 1524 } 1525 1526 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe { 1527 import std.format : format; 1528 1529 return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line)); 1530 } 1531 1532 @( 1533 "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed") 1534 unittest { 1535 import my.actor.system; 1536 1537 auto sys = makeSystem; 1538 1539 auto a0 = sys.spawn((Actor* self) { 1540 return impl(self, (ref CSelf!() ctx, int x) { 1541 Thread.sleep(50.dur!"msecs"); 1542 return 42; 1543 }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self), 1544 (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self)); 1545 }); 1546 1547 { 1548 auto self = scopedActor; 1549 bool excThrown; 1550 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1551 while (!excThrown && Clock.currTime < stopAt) { 1552 try { 1553 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {}); 1554 } catch (ScopedActorException e) { 1555 excThrown = e.error == ScopedActorError.timeout; 1556 } catch (Exception e) { 1557 logger.info(e.msg); 1558 } 1559 } 1560 assert(excThrown, "timeout did not trigger as expected"); 1561 } 1562 1563 { 1564 auto self = scopedActor; 1565 bool excThrown; 1566 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1567 while (!excThrown && Clock.currTime < stopAt) { 1568 try { 1569 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) { 1570 }); 1571 } catch (ScopedActorException e) { 1572 excThrown = e.error == ScopedActorError.down; 1573 } catch (Exception e) { 1574 logger.info(e.msg); 1575 } 1576 } 1577 assert(excThrown, "detecting terminated actor did not trigger as expected"); 1578 } 1579 }