Writing a Jobs Driver

JOBS drivers are mini-plugins that are connected to the main JOBS plugin and initialized by it.

Architecture

While initializing, the JOBS plugin searches for registered drivers by the Constructor interface. The Constructor and Driver (described below) interfaces are declared in the RR API repository.

Constructor interface:

constructor.go
// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
 // Name returns the name of the driver
 Name() string
 // DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
 DriverFromConfig(configKey string, queue Queue, pipeline Pipeline) (Driver, error)
 // DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
 DriverFromPipeline(pipe Pipeline, queue Queue) (Driver, error)
}

Driver interface:

driver.go
// Driver represents the interface for a single jobs driver
type Driver interface {
 // Push pushes the job to the underlying driver
 Push(ctx context.Context, msg Message) error
 // Run starts consuming the pipeline
 Run(ctx context.Context, pipeline Pipeline) error
 // Stop stops the consumer and closes the underlying connection
 Stop(ctx context.Context) error
 // Pause pauses the jobs consuming (while still allowing job pushing)
 Pause(ctx context.Context, pipeline string) error
 // Resume resumes the consumer
 Resume(ctx context.Context, pipeline string) error
 // State returns information about the driver state
 State(ctx context.Context) (*State, error)
}

So every driver should implement the Constructor interface to be found by the JOBS plugin. Let's have a look at the methods included in the Constructor interface:

  1. Name() string: This method should return a user-friendly name for the driver. It'll be used later in the pipelines <pipeline_name>.driver option. It is an important option. The name here and name in the pipeline options should match.

  2. DriverFromConfig(configKey string, queue Queue, pipeline Pipeline) (Driver, error): Returns a Driver implementation declared via configuration. RoadRunner, in turn, provides a configuration key (such as jobs.pipelines.pipeline-name.driver-name.config), the queue implementation to which messages are pushed, and the pipeline with all information about the pipeline. Later we will look at how to use this.

  3. DriverFromPipeline(pipe Pipeline, queue Queue) (Driver, error): Returns a Driver implementation declared via the RPC jobs.Declare call. It doesn't have configuration, but all info and configuration options are stored in the pipeline method argument.

Initialization

During initialization, the JOBS plugin searches for the JOBS drivers and saves them into a hashmap by the name provided by the Name() string method. It is not possible to have two drivers with the same name. Drivers here are the ones declared in the jobs.pipelines configuration. RoadRunner also saves pipelines declared via configuration by their name. Pipelines can also be declared with the jobs.Declare RPC method. If required, you may use the Configurer plugin to unmarshal global driver configuration, such as a connection string.

alt text

How to create a driver for JOBS

All code from the tutorial is here: link

To create a driver for jobs, you need to create a plugin instance:

This is a simple representation of the RR plugin. It is called driver because it is attached to another controlling plugin as a pluggable extender, aka: a driver. Keep in mind the plugin's name.

JOBS plugin will send the following data to the Constructor interface methods:

  1. If declared via configuration (.rr.yaml) - configKey, you may use that key to unmarshal the configuration section related solely to this driver. If the driver was declared via jobs.Declare RPC method, all configuration options would be stored in the jobs.Pipeline interface.

  2. jobs.Queue: Priority-Queue, used to push the messages and later process by the PHP workers.

  3. All other things like logger, Configurer plugin which will be used to get the values from the .rr.yaml configuration you may pass if you need them from the driver's root (e.g.: p.log).

Now, let's see the simplified Driver implementation:

Remember the following things:

  1. The FromConfig and FromPipeline methods are used to initialize the driver, not to start message consumption.

  2. The JOBS plugin will automatically call the Run method if your pipelines are in the jobs.consume array.

  3. For pipelines declared via the jobs.Declare RPC call, the jobs.Resume method should be called instead.

Pushing jobs into the priority queue

To push a job into the priority queue, you need to slightly transform it to add Ack, Nack, etc. methods to it. All interfaces are in the RR API repository. Let's have a look at the Job interface.

The Job interface also includes the pq.Item interface to satisfy a minimal priority queue requirement. You may add this (pq.Item) interface to any interface and benefit from RoadRunner's priority queue. So, our driver's Push method would be updated as follows:

The fromJob method is needed to simply transform jobs.Message into a Job. See link. The rest is to implement driver-specific Ack, Nack, etc. methods.

Configuration for your jobs driver:

Examples of existing drivers: AMQP, Kafka, In-Memory, Nats, SQS, BoltDB, Google Pub-Sub

Last updated

Was this helpful?