Skip to content

0xsequence/runnable

Repository files navigation

runnable

Build & Unit Tests

Overview

runnable is a Go package that provides a Runnable interface for functions or objects that can be started and stopped. It provides a simple way to run a function or object in a goroutine and stop it when needed. It also provides a way to run a function with retry and statistics number of restarts, when started and stopped, if returned error, etc.

Examples

Runnable Function

fmt.Println("Simple function...")
err := runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for i := 0; i < 5; i++ {
        select {
        case <-ctx.Done():
            return nil
        default:
        }
        time.Sleep(1 * time.Second)
        fmt.Println("Running...")
    }
    return nil
}).Run(context.Background())
if err != nil {
    fmt.Println(err)
}

Runnable Function with Stop

fmt.Println("Simple function with stop...")
r := runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-time.After(time.Second):
        }
        fmt.Println("Running...")
    }
})

go func() {
    time.Sleep(5 * time.Second)

    fmt.Println("Calling Stop...")
    err := r.Stop(context.Background())
    if err != nil {
        fmt.Println(err)
    }
}()

err = r.Run(context.Background())
if err != nil {
    fmt.Println(err)
}

Runnable Function with timeout

fmt.Println("Simple function with timeout...")
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = runnable.New(func(ctx context.Context) error {
    fmt.Println("Starting...")
    defer fmt.Println("Stopping...")

    for {
        select {
        case <-ctx.Done():
            return nil
        case <-time.After(time.Second):
        }
        fmt.Println("Running...")
    }
}).Run(ctxWithTimeout)
if err != nil {
    fmt.Println(err)
}

Adapters

Cross-cutting behaviors that aren't part of the core lifecycle live in the runnable/adapters subpackage as chi-style middleware: each runnable.Adapter has the shape func(next RunFunc) RunFunc. Apply them with runnable.WithAdapters (left-to-right = outermost-to-innermost):

r := runnable.New(reconcile, runnable.WithAdapters(
    adapters.Draining(10*time.Second),
    adapters.Recovering(),
    adapters.Retry(3, time.Minute),
    adapters.Ticker(30*time.Second),
))

Draining — graceful shutdown with a grace window. When the outer ctx is cancelled, the wrapped work has timeout to return via adapters.Stopping(ctx) before its ctx is force-cancelled and adapters.ErrDrainTimedOut is returned.

Ticker — calls the wrapped work once per interval until ctx is cancelled or the work returns an error. Composes with Draining: an in-flight tick is allowed to finish before the loop exits.

Recovering — turns panics in the wrapped work into errors and emits a runnable.PanicRecoveredEvent to the Publisher on ctx. Place inside Draining when both are in use.

Retry — re-invokes the wrapped work up to maxRetries times on non-context errors. If resetAfter is non-zero and at least that long has passed since the previous attempt, the retry budget resets. Emits a runnable.RetryEvent after each failed attempt.

Inside long-running work, always select on both ctx.Done() and adapters.Stopping(ctx)Stopping signals drain start, ctx.Done() fires only when the drain timer expires.

A full SIGTERM-safe service shape lives in examples/ticker-with-drain.

Observability via Publisher

Adapters emit typed events to a runnable.Publisher installed on the runnable's ctx. Use runnable.WithPublisher to register one (or many — multiple WithPublisher calls fan out):

type log struct{}

func (log) Publish(event any) {
    switch ev := event.(type) {
    case runnable.RetryEvent:
        fmt.Printf("retry attempt %d: %v\n", ev.Attempt, ev.Err)
    case runnable.DrainStartedEvent:
        fmt.Printf("drain started, %s window\n", ev.Timeout)
    case runnable.PanicRecoveredEvent:
        fmt.Fprintf(os.Stderr, "panic: %v\n%s", ev.Recovered, ev.Stack)
    }
}

r := runnable.New(work,
    runnable.WithPublisher(log{}),
    runnable.WithAdapters(adapters.Retry(3, time.Minute), adapters.Recovering()),
)

StatusStore is a Publisher too — WithStatus(id, store) wires it automatically and counts RetryEvents into Status.Restarts.

Publisher.Publish runs on the caller's goroutine, so subscribers must not block. Buffer internally if you need async dispatch.

Migrating from v0.0.x to v0.1.0

v0.1.0 moves retry and panic recovery out of the core package, and introduces drain-on-shutdown and periodic execution as new adapters. The Option-based WithRetry and WithRecoverer are removed; their replacements live at runnable/adapters as chi-style middleware applied via runnable.WithAdapters.

Before (v0.0.x):

r := runnable.New(doWork,
    runnable.WithRecoverer(reporter, nil),
    runnable.WithRetry(3, time.Minute),
)

After (v0.1.0):

r := runnable.New(doWork, runnable.WithAdapters(
    adapters.Recovering(),
    adapters.Retry(3, time.Minute),
))

Symbol mapping:

  • runnable.WithRetry / runnable.ResetNeveradapters.Retry / adapters.ResetNever.
  • runnable.WithRecovereradapters.Recovering() plus a runnable.WithPublisher subscriber listening for runnable.PanicRecoveredEvent (the two-interface RecoveryReporter / StackPrinter callback split is gone).

Status.Restarts is event-driven. The Restarts field on Status is unchanged from a caller's perspective, but it now counts runnable.RetryEvents published by adapters.Retry (or any other Publisher source) rather than being incremented by an onStart side-channel from WithRetry. No call-site change required when using WithStatus + adapters.Retry.

New in v0.1.0: adapters.Draining for graceful shutdown, adapters.Ticker for periodic execution, adapters.Stopping(ctx) to observe drain start, adapters.ErrDrainTimedOut. See the Adapters section above.

Runnable Object

package main

import (
	"time"

	"github.com/0xsequence/runnable"
)

type Monitor struct {
	runnable.Runnable
}

func NewMonitor() *Monitor {
	m := &Monitor{}
	m.Runnable = runnable.New(m.run)
	return m
}

func (m *Monitor) run(ctx context.Context) error {
	fmt.Println("Starting...")
	defer fmt.Println("Stopping...")
	
	// Start monitoring
	for {
		select {
		case <-ctx.Done():
			return nil
		default:
		}

		time.Sleep(1 * time.Second)
		fmt.Println("Monitoring...")
	}
	return nil
}

func main() {
	fmt.Println("Runnable object(Monitor)...")
	m := NewMonitor()

	go func() {
		time.Sleep(5 * time.Second)

		fmt.Println("Calling Stop...")
		err := m.Stop(context.Background())
		if err != nil {
			fmt.Println(err)
		}
	}()

	err = m.Run(context.Background())
	if err != nil {
		fmt.Println(err)
	}
}

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages