Open Source on IBM i

Streaming API for ILE

For those of you who cannot associate anything with streams in programming:

A stream consists of an emitter (provider) of data and a sequence of one or more consumers of data whereas the consumer may pass the data further to the next consumer constructing a streaming data pipeline.

Emitter

The emitter can produce a fixed or infinite number of data blocks. A stream is not restricted to one type of data. The emitter produces events which just hold a pointer to the data. Behind the pointer can lie any kind of data.

Consumer

Consumer of data can be categorized in pipes and sinks.

A pipe has multiple action choices:

  • process the data and push it to the next consumer
  • change the data but keep the type and push it to the next consumer
  • transform the data to a different type and push it to the next consumer
  • split the data into multiple chunks of data
  • drop the data and do not push it to the next consumer

A sink sits at the end of the pipeline and "drains" all events. It is the end of the process. After the sink processed the data the emitter emits the next event.

Consumer can provide various kind of functionality:

  • filter
  • aggregate
  • sum
  • group
  • distinct
  • map

Example

Emitter: Reads a file and pushes the data block by block to the next consumer Pipe: Calls the Qc3CalculateHash System API for every block of data Sink: Gets the calculated hash after the last block of data from the previous pipe and stores the hash

The code may look like this:

// setup the stream
stream = stream_create(emitter_file('data.txt')); // emits data from the file in blocks
stream_pipe(stream : pipe_md5());                 // calculates the md5 for the data block
stream_sink(stream : %paddr(writeMd5));           // local procedure storing the data

// start the stream , let the data flow
stream_flow(stream);

// cleanup the mess =)
stream_dispose(stream);

API Documentation

The API of this project is documented using ILEDocs and can be viewed at the ILEDocs installation at RPG Next Gen.

Installation

The project can either be build by using the make tool or installed with the iPKG client. It can be as simple as

ipkg install stream

The emitter will be packaged separately but can be as easily installed:

ipkg install 'stream-emitter'

No need to build from source.

Note: Use the LOC parameter on the ipkg command to specify where the object will be installed to.

The copybooks (prototypes, constants and templates) are packaged into separate packages.

  • stream-devel
  • stream-emitter-devel

Note: When installing the devel packages the LOC parameter accepts an IFS path for the copybook stream files.

Pipes will also be packaged in a separate package. But the project doesn't contain any pipes yet.

Ideas

What can be further expected from the project? More pipes and emitters will definitely follow like data queue emitters or random number emitters.

The big goal is to be able to not just sequentially process the data but to be able to spawn new threads from a consumer and process the data in parallel.

Feedback

I am happy for every feedback and contribution.

Happy streaming!

Mihael

Tags : RPG