Async/Await

Concurrent programming with a thread pool backend.

Overview

Strada provides first-class async/await support, allowing you to write concurrent code that's easy to read and maintain. Async functions execute on a thread pool, enabling parallel operations without the complexity of manual thread management.

Defining Async Functions

Use async func to define functions that run asynchronously:

async func fetch_data(str $url) str {
    # This runs in the thread pool
    my str $response = http_get($url);
    return $response;
}

async func compute(int $n) int {
    core::usleep(50000);  # 50ms work
    return $n * 2;
}

When called, async functions immediately return a Future while the actual work executes on a background thread.

Awaiting Futures

Use await to block until a Future completes and get its result:

my scalar $future = compute(21);
say("Future created, working...");
my int $result = await $future;
say("Result: " . $result);  # Result: 42

Parallel Execution

Launch multiple async operations concurrently by calling async functions before awaiting:

func main() int {
    # Start three operations in parallel
    my scalar $a = compute(10);
    my scalar $b = compute(20);
    my scalar $c = compute(30);

    # Wait for all results
    # Total time ~50ms, not 150ms!
    my int $r1 = await $a;  # 20
    my int $r2 = await $b;  # 40
    my int $r3 = await $c;  # 60

    return 0;
}

async:: Namespace Functions

Function Description
async::all(\@futures) Wait for all futures, return array of results
async::race(\@futures) Wait for first to complete, cancel others
async::timeout($f, $ms) Await with timeout (throws on timeout)
async::spawn($fn) Run any closure as a pool future
async::select(\@chs [, $ms]) Block on multiple channels; returns [index, value] (−1 timeout, −2 all closed)
async::sleep($ms) Cancellation-aware sleep (returns 0 when woken by cancel)
async::cancelled() Has THIS task been asked to cancel? (cooperative loops)
async::map($fn, \@items [, $n]) Data-parallel map; ordered results; first error rethrows
async::cancel($f) Request cancellation of a future
async::is_done($f) Non-blocking completion check
async::is_cancelled($f) Check if future was cancelled
async::pool_init($n) Initialize thread pool with N workers
async::pool_shutdown() Shutdown thread pool

Wait for All (async::all)

Wait for multiple futures to complete and get all results as an array:

my array @futures = (compute(1), compute(2), compute(3));
my array @results = async::all(\@futures);
# @results is [2, 4, 6]

Race (async::race)

Wait for the first future to complete, automatically cancelling the others:

async func slow_task(int $id, int $delay_ms) str {
    core::usleep($delay_ms * 1000);
    return "task " . $id;
}

my array @futures = (
    slow_task(1, 100),   # 100ms
    slow_task(2, 50),    # 50ms - wins!
    slow_task(3, 150)    # 150ms
);
my str $winner = async::race(\@futures);
say($winner);  # "task 2"

Timeout

Set a deadline for an operation:

my scalar $slow = slow_task(99, 500);  # 500ms
try {
    my str $r = async::timeout($slow, 100);  # 100ms timeout
    say("Got: " . $r);
} catch ($e) {
    say("Timed out: " . $e);
}

Cancellation

Request cancellation of a running future:

my scalar $future = slow_task(1, 1000);
async::cancel($future);

if (async::is_cancelled($future)) {
    say("Cancelled!");
}

try {
    await $future;  # Throws "Future was cancelled"
} catch ($e) {
    say("Caught: " . $e);
}

Error Propagation

Exceptions thrown in async functions propagate through await:

async func fail_async() int {
    throw "async error";
}

try {
    my int $x = await fail_async();
} catch ($e) {
    say("Caught: " . $e);  # "Caught: async error"
}

Thread Pool Configuration

The thread pool auto-initializes with 4 workers on first async call. For custom configuration:

func main() int {
    async::pool_init(8);  # 8 worker threads

    # ... async operations ...

    async::pool_shutdown();  # Optional cleanup
    return 0;
}

Non-Blocking Check

Check if a future is complete without blocking:

my scalar $future = compute(42);
my int $polls = 0;

while (async::is_done($future) == 0) {
    $polls++;
    core::usleep(1000);  # 1ms
}

my int $result = await $future;
say("Polled " . $polls . " times, result: " . $result);

Complete Example

async func compute(int $n) int {
    core::usleep(50000);
    return $n * 2;
}

async func slow_task(int $id, int $delay_ms) str {
    core::usleep($delay_ms * 1000);
    return "task " . $id . " done";
}

func main() int {
    say("=== Async Test ===");

    # Basic await
    my scalar $f = compute(21);
    my int $r = await $f;
    say("Result: " . $r);  # 42

    # Parallel execution
    my scalar $a = compute(10);
    my scalar $b = compute(20);
    say("Results: " . await $a . ", " . await $b);

    # Wait for all
    my array @futures = (compute(1), compute(2));
    my array @results = async::all(\@futures);
    say("All: " . join(", ", @results));

    # Race
    my array @race = (slow_task(1, 100), slow_task(2, 50));
    say("Winner: " . async::race(\@race));

    return 0;
}

Structured Concurrency (Async::Scope)

A scope (nursery) owns every task spawned inside it: wait() joins them all and returns results in spawn order, and a failure cancels the remaining siblings (their async::sleep calls wake early) before rethrowing the first error. No task outlives the scope.

use Async::Scope;

my scalar $scope = Async::Scope::new();
$scope->spawn(fn () scalar { return fetch("a"); });
$scope->spawn(fn () scalar { return fetch("b"); });
my array @results = $scope->wait();

Actors (Async::Actor)

An actor is a pool task looping over a private inbox channel — messages are handled strictly in order, so actor state needs no locking. tell is fire-and-forget; ask blocks for the handler's return value; stop drains and joins.

use Async::Actor;

my scalar $counter = Async::Actor::new(fn (scalar $msg, scalar $st) scalar {
    if ($msg eq "inc") { $st->{"n"} = $st->{"n"} + 1; }
    return $st->{"n"};
}, { n => 0 });

$counter->tell("inc");              # fire and forget
my scalar $n = $counter->ask("inc");  # round-trip
$counter->stop();

Channels

Channels provide thread-safe message passing between async tasks.

# Create channels
my scalar $ch = async::channel();       # Unbounded
my scalar $ch = async::channel(10);    # Bounded (capacity 10)

# Send and receive
async::send($ch, $value);              # Blocks if full
my scalar $v = async::recv($ch);       # Blocks if empty

# Non-blocking variants
async::try_send($ch, $value);          # Returns 0/1
my scalar $v = async::try_recv($ch);   # Returns undef if empty

# Close and check
async::close($ch);
async::is_closed($ch);
async::len($ch);                        # Items in queue

Channel Functions:

FunctionDescription
async::channel()Create unbounded channel
async::channel($n)Create bounded channel
async::send($ch, $v)Send (blocks if full)
async::recv($ch)Receive (blocks if empty)
async::try_send($ch, $v)Non-blocking send
async::try_recv($ch)Non-blocking receive
async::close($ch)Close channel
async::is_closed($ch)Check if closed
async::len($ch)Get queue length

Mutexes

Mutexes protect critical sections from concurrent access.

my scalar $m = async::mutex();
async::lock($m);           # Acquire (blocking)
# ... critical section ...
async::unlock($m);         # Release

async::try_lock($m);       # Non-blocking (0=success)
async::mutex_destroy($m);  # Clean up

Mutex Functions:

FunctionDescription
async::mutex()Create mutex
async::lock($m)Acquire lock
async::unlock($m)Release lock
async::try_lock($m)Non-blocking lock
async::mutex_destroy($m)Destroy mutex

Atomics

Lock-free integer operations for counters and flags.

my scalar $a = async::atomic(0);       # Create
async::atomic_load($a);                # Read
async::atomic_store($a, 100);          # Write
async::atomic_add($a, 10);             # Add, returns OLD
async::atomic_sub($a, 5);              # Sub, returns OLD
async::atomic_inc($a);                 # Inc, returns NEW
async::atomic_dec($a);                 # Dec, returns NEW
async::atomic_cas($a, $exp, $new);    # CAS (returns 1 if swapped)

Atomic Functions:

FunctionDescription
async::atomic($n)Create atomic
async::atomic_load($a)Read value
async::atomic_store($a, $v)Write value
async::atomic_add($a, $d)Add, return OLD
async::atomic_sub($a, $d)Subtract, return OLD
async::atomic_inc($a)Increment, return NEW
async::atomic_dec($a)Decrement, return NEW
async::atomic_cas($a, $e, $n)Compare-and-swap

Event Loop and Green Tasks (Async::Loop)

For servers, the thread pool's one-OS-thread-per-blocked-connection model doesn't scale. Async::Loop multiplexes any number of connections onto one thread (epoll on Linux, poll(2) on other POSIX systems), and green tasks let handler code read like plain blocking Strada:

use Async::Loop;
use Async::Task;

my scalar $loop = Async::Loop::new();
my scalar $listener = core::socket_server(8080);

$loop->spawn(fn () {
    while (1) {
        my scalar $conn = Async::Task::accept($listener);
        $loop->spawn(fn () {
            my str $req = Async::Task::readline($conn, 5000);
            Async::Task::send($conn, "HTTP/1.0 200 OK\r\n\r\nhello");
            core::socket_close($conn);
        });
    }
});

$loop->run();

One thread handles ~34k echo round-trips/s across 50 concurrent task connections (loopback, -O2). See docs/EVENT_LOOP.md in the repository for the full API, task rules, and design notes.