Code Rhapsodie Dataflow Bundle

DataflowBundle is a bundle for Symfony 3.4+ providing an easy way to create import / export dataflow.

Dataflow uses a linear generic workflow in three parts:

  • one reader
  • any number of steps
  • one or more writers

The reader can read data from anywhere and return data row by row. Each step processes the current row data. The steps are executed in the order in which they are added. And, one or more writers save the row anywhere you want.

As the following schema shows, you can define more than one dataflow:

Dataflow schema


  • Define and configure a Dataflow
  • Run the Job scheduled
  • Run one Dataflow from the command line
  • Define the schedule for a Dataflow from the command line
  • Enable/Disable a scheduled Dataflow from the command line
  • Display the list of scheduled Dataflow from the command line
  • Display the result for the last Job for a Dataflow from the command line


Add the dependency

To install this bundle, run this command :

$ composer require code-rhapsodie/dataflow


You can use the generic readers, writers and steps from PortPHP.

For the writers, you must use the adapter CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter like this:

// ...
$streamWriter = new \Port\Writer\StreamMergeWriter();

$builder->addWriter(new \CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter($streamWriter));
// ...

Register the bundle

Symfony 4 (new tree)

For Symfony 4, add CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true], in the config/bundles.php file.

Like this:


return [
     // ...
    CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
    // ...

Symfony 3.4 (old tree)

For Symfony 3.4, add a new line in the app/AppKernel.php file.

Like this:

// app/AppKernel.php

public function registerBundles()
    $bundles = [
        // ...
        new CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle(),
        // ...

Update the database

This bundle uses Doctrine ORM for drive the database table for store Dataflow schedule (cr_dataflow_scheduled) and jobs (cr_dataflow_job).

Doctrine migration

Execute the command to generate the migration for your database:

$ bin/console doctrine:migration:diff

Other migration tools

If you use Phinx or Kaliop Migration Bundle or whatever, you can add a new migration with the generated SQL query from this command:

$ bin/console doctrine:schema:update --dump-sql

Define a dataflow type

This bundle uses a fixed and simple workflow structure in order to let you focus on the data processing logic part of your dataflow.

A dataflow type defines the different parts of your dataflow. A dataflow is made of:

  • exactly one Reader
  • any number of Steps
  • one or more Writers

Dataflow types can be configured with options.

A dataflow type must implement CodeRhapsodie\DataflowBundle\DataflowType\DataflowTypeInterface.

To help with creating your dataflow types, an abstract class CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType is provided, allowing you to define your dataflow through a handy builder CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder.

This is an example to define one class DataflowType:

namespace CodeRhapsodie\DataflowExemple\DataflowType;

use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\DataflowExemple\Reader\FileReader;
use CodeRhapsodie\DataflowExemple\Writer\FileWriter;

class MyFirstDataflowType extends AbstractDataflowType
    private $myReader;

    private $myWriter;

    public function __construct(FileReader $myReader, FileWriter $myWriter)
        $this->myReader = $myReader;
        $this->myWriter = $myWriter;

    protected function buildDataflow(DataflowBuilder $builder, array $options): void

            ->addStep(function($data) use ($options) {
                // TODO : Write your code here...
                return $data;

    protected function configureOptions(OptionsResolver $optionsResolver): void
            'my_option' => 'my_default_value',
            'fileName'  => null,

    public function getLabel(): string
        return 'My First Dataflow';

    public function getAliases(): iterable
        return ['mfd'];

Dataflow types must be tagged with coderhapsodie.dataflow.type.

If you're using Symfony auto-configuration for your services, this tag will be automatically added to all services implementing DataflowTypeInterface.

Otherwise, manually add the tag coderhapsodie.dataflow.type in your dataflow type service configuration:

        - { name: coderhapsodie.dataflow.type }

Use options for your dataflow type

The AbstractDataflowType can help you define options for your Dataflow type.

Add this method in your DataflowType class:

// ...
use Symfony\Component\OptionsResolver\OptionsResolver;

class MyFirstDataflowType extends AbstractDataflowType
    // ...
    protected function configureOptions(OptionsResolver $optionsResolver): void
            'my_option' => 'my_default_value',
            'fileName'  => null,


With this configuration, the option fileName is required. For an advanced usage of the option resolver, read the Symfony documentation.

Check if your DataflowType is ready

Execute this command to check if your DataflowType is correctly registered:

$ bin/console debug:container --tag coderhapsodie.dataflow.type --show-private

The result is like this:

Symfony Container Public and Private Services Tagged with "coderhapsodie.dataflow.type" Tag

 ---------------------------------------------------------------- ---------------------------------------------------------------- 
  Service ID                                                       Class name                                                      
 ---------------------------------------------------------------- ---------------------------------------------------------------- 
  CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType   CodeRhapsodie\DataflowExemple\DataflowType\MyFirstDataflowType  
 ---------------------------------------------------------------- ---------------------------------------------------------------- 


Readers provide the dataflow with elements to import / export. Usually, elements are read from an external resource (file, database, webservice, etc).

A Reader can be any iterable.

The only constraint on the returned elements typing is that they cannot be false.

The reader can be a generator like this example :


namespace CodeRhapsodie\DataflowExemple\Reader;

class FileReader
    private $filename;

     * Set the filename option needed by the Reader.
    public function setFilename(string $filename) {
        $this->filename = $filename;

    public function __invoke(): iterable
        if (!$this->filename) {
            throw new \Exception("The file name is not defined. Define it with 'setFilename' method");

        if (!$fh = fopen($this->filename, 'r')) {
            throw new \Exception("Unable to open file '".$this->filename."' for read.");

        while (false === ($read = fread($fh, 1024))) {
            yield explode("|", $read);

You can set up this reader as follows:



Steps are operations performed on the elements before they are handled by the Writers. Usually, steps are either:

  • converters, that alter the element
  • filters, that conditionally prevent further operations on the element

A Step can be any callable, taking the element as its argument, and returning either:

  • the element, possibly altered
  • false, if no further operations should be performed on this element

A few examples:

$builder->addStep(function($item) {
    // Titles are changed to all caps before export
    $item['title'] = strtoupper($item['title']);

    return $item;

$builder->addStep(function($item) {
    // Private items are not exported
    if ($item['private']) {
        return false;

    return $item;


Writers perform the actual import / export operations.

A Writer must implement CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface. As this interface is not compatible with Port\Writer, the adapter CodeRhapsodie\DataflowBundle\DataflowType\Writer\PortWriterAdapter is provided.

This example show how to use the predefined PhpPort Writer :

$builder->addWriter(new PortWriterAdapter(new \Port\FileWriter()));

Or your own Writer:

namespace CodeRhapsodie\DataflowExemple\Writer;

use CodeRhapsodie\DataFlowBundle\DataflowType\Writer\WriterInterface;

class FileWriter implements WriterInterface
    private $fh;

    public function prepare()

        if (!$this->fh = fopen('/path/to/file', 'w')) {
            throw new \Exception("Unable to open in write mode the output file.");

    public function write($item)
        fputcsv($this->fh, $item);

    public function finish()


All pending dataflow job processes are stored in a queue into the database.

Add this command into your crontab for execute all queued jobs:

$ SYMFONY_ENV=prod php bin/console code-rhapsodie:dataflow:run-pending


Several commands are provided to manage schedules and run jobs.

code-rhapsodie:dataflow:run-pending Executes job in the queue according to their schedule.

code-rhapsodie:dataflow:schedule:list Display the list of dataflows scheduled.

code-rhapsodie:dataflow:schedule:change-status Enable or disable a scheduled dataflow

code-rhapsodie:dataflow:schedule:add Add the schedule for a dataflow.

code-rhapsodie:dataflow:job:show Display the last result of a job.

code-rhapsodie:dataflow:execute Let you execute one dataflow job.

Issues and feature requests

Please report issues and request features at


Contributions are very welcome. Thanks to everyone who has contributed already.


This package is licensed under the MIT license.

