RR as AWS Lambda

RoadRunner can run PHP as an AWS Lambda function.

PHP Worker

The PHP worker does not require any specific configuration to run inside a Lambda function. We can use the default snippet with an internal counter to demonstrate how workers are reused:

handler.php
<?php
/**
 * @var Goridge\RelayInterface $relay
 */
use Spiral\Goridge;
use Spiral\RoadRunner;

ini_set('display_errors', 'stderr');
require __DIR__ . "/vendor/autoload.php";

$worker = RoadRunner\Worker::create();
$psr7 = new RoadRunner\Http\PSR7Worker(
    $worker,
    new \Nyholm\Psr7\Factory\Psr17Factory(),
    new \Nyholm\Psr7\Factory\Psr17Factory(),
    new \Nyholm\Psr7\Factory\Psr17Factory()
);

while ($req = $psr7->waitRequest()) {
    try {
        $resp = new \Nyholm\Psr7\Response();
        $resp->getBody()->write("hello world");

        $psr7->respond($resp);
    } catch (\Throwable $e) {
        $psr7->getWorker()->error((string)$e);
    }
}

Name this file handler.php and put it in the root of your project. Make sure to run:

composer require spiral/roadrunner-http nyholm/psr7

Application

We can create a simple application to demonstrate how it works:

  1. You need three files: main.go with the Endure container:

main.go
package main

import (
  _ "embed"
  "log"
  "log/slog"
  "os"
  "os/signal"
  "sync"
  "syscall"
  "time"

  "github.com/roadrunner-server/config/v4"
  "github.com/roadrunner-server/endure/v2"
  "github.com/roadrunner-server/logger/v4"
  "github.com/roadrunner-server/server/v4"
)

//go:embed .rr.yaml
var rrYaml []byte

func main() {
  _ = os.Setenv("PATH", os.Getenv("PATH")+":"+os.Getenv("LAMBDA_TASK_ROOT"))
  _ = os.Setenv("LD_LIBRARY_PATH", "./lib:/lib64:/usr/lib64")

  cont := endure.New(slog.LevelError)

  cfg := &config.Plugin{
    Version:   "2024.1.0",
    Timeout:   time.Second * 30,
    Prefix:    "rr",
    Type:      "yaml",
    ReadInCfg: rrYaml,
  }

  err := cont.RegisterAll(
    cfg,
    &logger.Plugin{},
    &Plugin{},
    &server.Plugin{},
  )
  if err != nil {
    log.Fatal(err)
  }

  err = cont.Init()
  if err != nil {
    log.Fatal(err)
  }

  ch, err := cont.Serve()
  if err != nil {
    log.Fatal(err)
  }

  sig := make(chan os.Signal, 1)
  signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

  wg := &sync.WaitGroup{}
  wg.Add(1)

  go func() {
    defer wg.Done()
    for {
      select {
      case e := <-ch:
        err = cont.Stop()
        if err != nil {
          log.Println(e.Error.Error())
        }
      case <-sig:
        err = cont.Stop()
        if err != nil {
          log.Println(err.Error())
        }
        return
      }
    }
  }()

  wg.Wait()
}
  1. plugin.go with the plugin implementation:

plugin.go
package main

import (
  "context"
  "sync"
  "time"

  "github.com/goccy/go-json"
  "github.com/roadrunner-server/errors"
  "github.com/roadrunner-server/goridge/v3/pkg/frame"
  "github.com/roadrunner-server/sdk/v4/pool"
  "github.com/roadrunner-server/sdk/v4/worker"

  "github.com/aws/aws-lambda-go/events"
  "github.com/aws/aws-lambda-go/lambda"
  "github.com/roadrunner-server/sdk/v4/payload"
  poolImp "github.com/roadrunner-server/sdk/v4/pool/static_pool"
  "go.uber.org/zap"
)

const (
  pluginName string = "lambda"
)

type Plugin struct {
  mu      sync.Mutex
  log     *zap.Logger
  srv     Server
  pldPool sync.Pool
  wrkPool Pool
}

// Logger plugin
type Logger interface {
  NamedLogger(name string) *zap.Logger
}

type Pool interface {
  // Workers returns workers list associated with the pool.
  Workers() (workers []*worker.Process)
  // Exec payload
  Exec(ctx context.Context, p *payload.Payload, stopCh chan struct{}) (chan *poolImp.PExec, error)
  // RemoveWorker removes worker from the pool.
  RemoveWorker(ctx context.Context) error
  // AddWorker adds worker to the pool.
  AddWorker() error
  // Reset kill all workers inside the watcher and replaces with new
  Reset(ctx context.Context) error
  // Destroy all underlying stacks (but let them complete the task).
  Destroy(ctx context.Context)
}

// Server creates workers for the application.
type Server interface {
  NewPool(ctx context.Context, cfg *pool.Config, env map[string]string, _ *zap.Logger) (*poolImp.Pool, error)
}

func (p *Plugin) Init(srv Server, log Logger) error {
  p.srv = srv
  p.log = log.NamedLogger(pluginName)
  p.pldPool = sync.Pool{
    New: func() any {
      return &payload.Payload{
        Codec:   frame.CodecJSON,
        Context: make([]byte, 0, 100),
        Body:    make([]byte, 0, 100),
      }
    },
  }

  return nil
}

func (p *Plugin) Serve() chan error {
  errCh := make(chan error, 1)
  const op = errors.Op("plugin_serve")

  p.mu.Lock()
  defer p.mu.Unlock()

  var err error
  p.wrkPool, err = p.srv.NewPool(context.Background(), &pool.Config{
    NumWorkers:      4,
    AllocateTimeout: time.Second * 20,
    DestroyTimeout:  time.Second * 20,
  }, nil, nil)
  if err != nil {
    errCh <- errors.E(op, err)
    return errCh
  }

  go func() {
    // register handler
    lambda.Start(p.handler())
  }()

  return errCh
}

func (p *Plugin) Stop(ctx context.Context) error {
  p.mu.Lock()
  defer p.mu.Unlock()

  if p.wrkPool != nil {
    p.wrkPool.Destroy(ctx)
  }

  return nil
}

func (p *Plugin) handler() func(ctx context.Context, request events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
  return func(ctx context.Context, request events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
    requestJSON, err := json.Marshal(request)
    if err != nil {
      return events.APIGatewayV2HTTPResponse{Body: "", StatusCode: 500}, nil
    }

    ctxJSON, err := json.Marshal(ctx)
    if err != nil {
      return events.APIGatewayV2HTTPResponse{Body: "", StatusCode: 500}, nil
    }

    pld := p.getPld()
    defer p.putPld(pld)

    pld.Body = requestJSON
    pld.Context = ctxJSON

    re, err := p.wrkPool.Exec(ctx, pld, nil)
    if err != nil {
      return events.APIGatewayV2HTTPResponse{Body: "", StatusCode: 500}, nil
    }

    var r *payload.Payload

    select {
    case pl := <-re:
      if pl.Error() != nil {
        return events.APIGatewayV2HTTPResponse{Body: "", StatusCode: 500}, nil
      }
      // streaming is not supported
      if pl.Payload().Flags&frame.STREAM != 0 {
        return events.APIGatewayV2HTTPResponse{Body: "streaming is not supported", StatusCode: 500}, nil
      }

      // assign the payload
      r = pl.Payload()
    default:
      return events.APIGatewayV2HTTPResponse{Body: "worker empty response", StatusCode: 500}, nil
    }

    var response events.APIGatewayV2HTTPResponse
    err = json.Unmarshal(r.Body, &response)
    if err != nil {
      return events.APIGatewayV2HTTPResponse{Body: "", StatusCode: 500}, nil
    }
    return response, nil
  }
}

func (p *Plugin) putPld(pld *payload.Payload) {
  pld.Body = nil
  pld.Context = nil
  p.pldPool.Put(pld)
}

func (p *Plugin) getPld() *payload.Payload {
  pld := p.pldPool.Get().(*payload.Payload)
  return pld
}
  1. A config file, which can be embedded into the binary using the embed package:

.rr.yaml
version: "3"

server:
  command: "php handler.php"
  relay: pipes

logs:
  mode: production
  level: error
  encoding: json
  output: [ stderr ]

endure:
  grace_period: 1s

Here you can take full advantage of RoadRunner: you can include any plugin here and configure it with the embedded config (within reasonable limits).

To build and package your Lambda function, run:

build.sh
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags "-s" -o bootstrap-amd64 main.go plugin.go
zip main.zip * -r

You can now upload and invoke your handler using a simple string event.

Repository with the full example

Notes

There are multiple notes to acknowledge:

  • Start with one worker per Lambda function to control your memory usage.

  • Make sure to include the environment variables listed in the code to properly resolve the location of the PHP binary and its dependencies.

Last updated

Was this helpful?