The RoadRunner Centrifuge plugin provides seamless integration with Centrifugo, a powerful websocket server. This plugin allows you to proxy events from Websocket server to PHP workers running on RoadRunner and send data back to the websocket client, enabling real-time communication between the server and the client.
The plugin provides the following features:
Event Proxying: The plugin allows RoadRunner to receive all the events from Centrifugo server such as Connect, Refresh, RPC, Subscribe, Publish, and Sub refresh. It then proxies these events into PHP workers and sends a result from PHP application back to the websocket client.
RPC API: The plugin provides an RPC API that allows you to send data to the websocket server. For example, you can publish or broadcast data into a channel from PHP application using the API.
The Centrifuge plugin [since 2.12.0] replaces our deprecated Websockets and Broadcast plugins.
In addition, Centrifugo provides a convenient JavaScript library that simplifies the development of real-time applications.
PHP client
The RoadRunner centrifuge plugin comes with a convenient PHP package that simplifies the process of integrating the plugin with your PHP application. The package provides a set of classes and functions that handle incoming events from Centrifugo and allow you to send data back to the websocket server via RPC.
Installation
To install the package, run the following command:
composerrequireroadrunner-php/centrifugo
Configuration
First you need to add centrifuge section to your RoadRunner configuration. For example, such a configuration would be quite feasible to run:
.rr.yaml
rpc:listen:tcp://127.0.0.1:6001server:command:"php app.php"relay:pipescentrifuge:# Centrifugo server proxy address (docs: https://centrifugal.dev/docs/server/proxy#grpc-proxy)# Optional, default: tcp://127.0.0.1:30000proxy_address:"tcp://127.0.0.1:30000"# gRPC server API address (docs: https://centrifugal.dev/docs/server/server_api#grpc-api)# Optional, default: tcp://127.0.0.1:30000. Centrifugo: `grpc_api` should be set to true and `grpc_port` should be the same as in the RR's config.grpc_api_address:tcp://127.0.0.1:30000# Use gRPC gzip compressor# Optional, default: falseuse_compressor:true# Your application version# Optional, default: v1.0.0version:"v1.0.0"# Your application name# Optional, default: roadrunnername:"roadrunner"# TLS configuration# Optional, default: nulltls:# TLS key# Requiredkey:/path/to/key.pem# TLS certificate# Requiredcert:/path/to/cert.pem
And also you need to configure Centrifugo server to use RoadRunner as a proxy.
proxy_connect_endpoint, proxy_publish_endpoint, proxy_subscribe_endpoint, proxy_refresh_endpoint, proxy_sub_refresh_endpoint, proxy_rpc_endpoint - endpoint address of roadrunner server with activated centrifuge plugin.
PHP worker example
Here is an example of a PHP worker:
centrifuge-worker.php
<?phprequire__DIR__.'/vendor/autoload.php';useRoadRunner\Centrifugo\CentrifugoWorker;useRoadRunner\Centrifugo\Payload;useRoadRunner\Centrifugo\Request;useRoadRunner\Centrifugo\Request\RequestFactory;useSpiral\RoadRunner\Worker;$worker =Worker::create();$requestFactory =newRequestFactory($worker);// Create a new Centrifugo Worker from global environment$centrifugoWorker =newCentrifugoWorker($worker, $requestFactory);while ($request = $centrifugoWorker->waitRequest()) {if ($request instanceofRequest\Invalid) { $errorMessage = $request->getException()->getMessage();if ($request->getException()instanceof\RoadRunner\Centrifugo\Exception\InvalidRequestTypeException) { $payload = $request->getException()->payload; }// Handle invalid request// $logger->error($errorMessage, $payload ?? []);continue; }if ($request instanceofRequest\Refresh) {try {// Do something $request->respond(newPayload\RefreshResponse(// ... )); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); }continue; }if ($request instanceofRequest\Subscribe) {try {// Do something $request->respond(newPayload\SubscribeResponse(// ... ));// You can also disconnect connection $request->disconnect('500','Connection is not allowed.'); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); }continue; }if ($request instanceofRequest\Publish) {try {// Do something $request->respond(newPayload\PublishResponse(// ... ));// You can also disconnect connection $request->disconnect('500','Connection is not allowed.'); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); }continue; }if ($request instanceofRequest\RPC) {try { $response = $router->handle(newRequest(uri: $request->method, data: $request->data),); // ['user' => ['id' => 1, 'username' => 'john_smith']] $request->respond(newPayload\RPCResponse( data: $response )); } catch (\Throwable $e) { $request->error($e->getCode(), $e->getMessage()); }continue; }}
Protobuf API
To make it easy to use the Centrifugo proto API in PHP, we provide a GitHub repository, that contains all the generated PHP DTO classes for the Centrifugo proxy and API proto files, making it easy to work with these files in your PHP application.
RR follows the proxy.proto specifications and proxies these events to the PHP worker. To determine what proxy method was called inside the PHP, RR adds a type : endpoint metadata. For example, if the Subscribe method was called, RR will add type:subscribe metadata to the worker's context.
RPC
You may also use RPC methods to communicate with Centrifugo server. RR follows the official Centrifugo proto API. Official documentation available here
Proxy events
With the incoming payload, RoadRunner also adds the type of the proxied request to the headers before sending it to the PHP worker. The key in the headers is called type. Here is the complete list of types supported by RoadRunner:
connect: Connect proxy request.
refresh: Refresh proxy request.
subscribe: Subscribe proxy request.
publish: Publish proxy request.
rpc: RPC proxy request.
subrefresh: Subscription refresh proxy request.
notifychannelstate: Notify channel state proxy request.
Metrics
RoadRunner has a metrics plugin that provides metrics for the Centrifuge plugin, which can be used with Prometheus.