# Writing a Jobs Driver

JOBS drivers are mini-plugins that are connected to the main JOBS plugin and initialized by it.

## Architecture

While initializing, the JOBS plugin searches for registered drivers by the `Constructor` interface. The `Constructor` and `Driver` (described below) interfaces are declared in the [RR API repository](https://github.com/roadrunner-server/api/blob/master/plugins/v4/jobs/driver.go).

Constructor interface:

{% code title="constructor.go" %}

```go
// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
 // Name returns the name of the driver
 Name() string
 // DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
 DriverFromConfig(configKey string, queue Queue, pipeline Pipeline) (Driver, error)
 // DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
 DriverFromPipeline(pipe Pipeline, queue Queue) (Driver, error)
}
```

{% endcode %}

Driver interface:

{% code title="driver.go" %}

```go
// Driver represents the interface for a single jobs driver
type Driver interface {
 // Push pushes the job to the underlying driver
 Push(ctx context.Context, msg Message) error
 // Run starts consuming the pipeline
 Run(ctx context.Context, pipeline Pipeline) error
 // Stop stops the consumer and closes the underlying connection
 Stop(ctx context.Context) error
 // Pause pauses the jobs consuming (while still allowing job pushing)
 Pause(ctx context.Context, pipeline string) error
 // Resume resumes the consumer
 Resume(ctx context.Context, pipeline string) error
 // State returns information about the driver state
 State(ctx context.Context) (*State, error)
}
```

{% endcode %}

So every driver should implement the `Constructor` interface to be found by the JOBS plugin. Let's have a look at the methods included in the `Constructor` interface:

1. `Name() string`: This method should return a user-friendly name for the driver. It'll be used later in the pipelines `<pipeline_name>.driver` option. **It is an important option. The name here and name in the pipeline options should match.**
2. `DriverFromConfig(configKey string, queue Queue, pipeline Pipeline) (Driver, error)`: Returns a `Driver` implementation declared via configuration. RoadRunner, in turn, provides a configuration key (such as `jobs.pipelines.pipeline-name.driver-name.config`), the queue implementation to which messages are pushed, and the pipeline with all information about the pipeline. Later we will look at how to use this.
3. `DriverFromPipeline(pipe Pipeline, queue Queue) (Driver, error)`: Returns a `Driver` implementation declared via the RPC `jobs.Declare` call. It doesn't have configuration, but all info and configuration options are stored in the `pipeline` method argument.

### Initialization

During initialization, the JOBS plugin searches for the JOBS drivers and saves them into a hashmap by the name provided by the `Name() string` method. It is not possible to have two drivers with the same name. Drivers here are the ones declared in the `jobs.pipelines` configuration. RoadRunner also saves pipelines declared via configuration by their name. Pipelines can also be declared with the [`jobs.Declare`](https://github.com/roadrunner-php/jobs/blob/v4.5.0/src/Jobs.php#L26) RPC method. If required, you may use the `Configurer` plugin to unmarshal global driver configuration, such as a connection string.

![alt text](https://2796799470-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FiNSGMfXe3BkZWTJw82Ho%2Fuploads%2Fgit-blob-e5595f44617400aef1091f609f601811fee49dc7%2Fimage.png?alt=media)

### How to create a driver for JOBS

All code from the tutorial is here: [link](https://github.com/roadrunner-server/samples/blob/master/plugins/jobs_driver/)

To create a driver for jobs, you need to create a plugin instance:

{% code title="driver.go" %}

```go
package jobs_driver //nolint:revive,stylecheck

import (
	"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
	"github.com/roadrunner-server/errors"
	"github.com/roadrunner-server/samples/plugins/jobs_driver/driver"
	"go.uber.org/zap"
)

const pluginName string = "my_awesome_driver"

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if a config section exists.
	Has(name string) bool
}

type Logger interface {
	NamedLogger(name string) *zap.Logger
}

type Plugin struct {
	log *zap.Logger
	cfg Configurer
}

func (p *Plugin) Init(log Logger, cfg Configurer) error {
	if !cfg.Has(pluginName) {
		return errors.E(errors.Disabled)
	}

	p.log = log.NamedLogger(pluginName)
	p.cfg = cfg
	return nil
}

func (p *Plugin) Name() string {
	return pluginName
}

func (p *Plugin) DriverFromConfig(configKey string, pq jobs.Queue, pipeline jobs.Pipeline) (jobs.Driver, error) {
	return driver.FromConfig(configKey, p.log, p.cfg, pipeline, pq)
}

func (p *Plugin) DriverFromPipeline(pipe jobs.Pipeline, pq jobs.Queue) (jobs.Driver, error) {
	return driver.FromPipeline(pipe, p.log, p.cfg, pq)
} 
```

{% endcode %}

This is a simple representation of the RR plugin. It is called driver because it is attached to another controlling plugin as a pluggable extender, aka: a driver. Keep in mind the plugin's name.

JOBS plugin will send the following data to the `Constructor` interface methods:

1. If declared via configuration (`.rr.yaml`) - `configKey`, you may use that key to unmarshal the configuration section related solely to this driver. If the driver was declared via `jobs.Declare` RPC method, all configuration options would be stored in the `jobs.Pipeline` interface.
2. `jobs.Queue`: Priority-Queue, used to push the messages and later process by the PHP workers.
3. All other things like logger, `Configurer` plugin which will be used to get the values from the `.rr.yaml` configuration you may pass if you need them from the driver's root (e.g.: `p.log`).

Now, let's see the simplified `Driver` implementation:

{% code title="driver.go" %}

```go
package driver

import (
	"context"

	"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
	"go.uber.org/zap"
)

var _ jobs.Driver = (*Driver)(nil)

type Configurer interface {
	// UnmarshalKey takes a single key and unmarshal it into a Struct.
	UnmarshalKey(name string, out any) error
	// Has checks if a config section exists.
	Has(name string) bool
}

type Driver struct {
}

func FromConfig(configKey string, log *zap.Logger, cfg Configurer, pipeline jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
	return &Driver{}, nil
}

// FromPipeline initializes consumer from pipeline
func FromPipeline(pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
	return &Driver{}, nil
}

func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
	return nil
}

func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
	return nil
}

func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
	return nil, nil
}

func (d *Driver) Pause(ctx context.Context, p string) error {
	return nil
}

func (d *Driver) Resume(ctx context.Context, p string) error {
	return nil
}

func (d *Driver) Stop(ctx context.Context) error {
	return nil
}
```

{% endcode %}

Remember the following things:

1. The `FromConfig` and `FromPipeline` methods are used to initialize the driver, not to start message consumption.
2. The `JOBS` plugin will automatically call the `Run` method if your pipelines are in the `jobs.consume` array.
3. For pipelines declared via the `jobs.Declare` RPC call, the `jobs.Resume` method should be called instead.

### Pushing jobs into the priority queue

To push a job into the priority queue, you need to slightly transform it to add `Ack`, `Nack`, etc. methods to it. All interfaces are in the [RR API repository](https://github.com/roadrunner-server/api/blob/master/plugins/v4/jobs/job.go). Let's have a look at the `Job` interface.

{% code title="job.go" %}

```go
// Job represents a binary heap item
type Job interface {
	pq.Item
	// Ack acknowledges the item after processing
	Ack() error
	// Nack discards the item
	Nack() error
	// NackWithOptions discards the item with an optional requeue flag
	NackWithOptions(requeue bool, delay int) error
	// Requeue puts the message back to the queue with an optional delay
	Requeue(headers map[string][]string, delay int) error
	// Body returns the payload associated with the item
	Body() []byte
	// Context returns any meta-information associated with the item
	Context() ([]byte, error)
	// Headers return the metadata for the item
	Headers() map[string][]string
}
```

{% endcode %}

The `Job` interface also includes the `pq.Item` interface to satisfy a minimal priority queue requirement. You may add this (`pq.Item`) interface to any interface and benefit from RoadRunner's priority queue. So, our driver's `Push` method would be updated as follows:

{% code title="driver.go" %}

```go
func (d *Driver) Push(_ context.Context, job jobs.Message) error {
	item := fromJob(job)
	d.queue.Insert(item)
	return nil
}

func fromJob(job jobs.Message) *Item {
	return &Item{
		Job:     job.Name(),
		Ident:   job.ID(),
		Payload: job.Payload(),
		headers: job.Headers(),
		Options: &Options{
			Priority: job.Priority(),
			Pipeline: job.GroupID(),
			Delay:    int(job.Delay()),
			AutoAck:  job.AutoAck(),
		},
	}
}
```

{% endcode %}

The `fromJob` method is needed to simply transform `jobs.Message` into a `Job`. See [link](https://github.com/roadrunner-server/samples/blob/master/plugins/jobs_driver/driver/message.go). The rest is to implement driver-specific `Ack`, `Nack`, etc. methods.

Configuration for your jobs driver:

{% code title=".rr.yaml" %}

```yaml
version: '3'

rpc:
  listen: tcp://127.0.0.1:6001

server:
  command: "php your_php_worker.php"
  relay: "pipes"

your_global_section:
  addr: "some_connection_string"

logs:
  level: error
  encoding: console
  mode: development

jobs:
  pool:
    num_workers: 10
    allocate_timeout: 60s
    destroy_timeout: 1s

  pipelines:
    test-1:
      driver: my_awesome_driver
      config:
        priority: 1
        prefetch: 100
        # rest of the options

  consume: [ "test-1" ]
```

{% endcode %}

Examples of existing drivers: [AMQP](https://github.com/roadrunner-server/amqp), [Kafka](https://github.com/roadrunner-server/kafka), [In-Memory](https://github.com/roadrunner-server/memory), [Nats](https://github.com/roadrunner-server/nats), [SQS](https://github.com/roadrunner-server/sqs), [BoltDB](https://github.com/roadrunner-server/boltdb), [Google Pub-Sub](https://github.com/roadrunner-server/google-pub-sub)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.roadrunner.dev/docs/customization/jobs-driver.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
