Spring Cloud Stream includes integration with Spring Cloud Function’s function-based programming model that lets the
business logic of an application be modeled as a java.util.Supplier, a java.util.Function, and a java.util.Consumer,
representing the roles of a Source, a Processor, and a Sink, respectively.
Building on this foundation, we can extend existing Source and Sink applications by importing the configuration of an
existing Source or Sink and adding code that defines a java.util.Function — this delivers a lot of powerful composition
possibilities.
Take for instance, the Source applications are auto-configured with functions, which may optionally
be included in a composite function definition.
With this, the same Source application can potentially do one or all of the following without having to build it out as a standalone processor.
-
execute SpEL transformations
-
enrich message headers
-
filter events
-
produce task launch requests on upstream events
For example, the time source when it is running, as shown below, will perform a series of internal transformations to
finally publish a task launch request every second to the rabbit exchange with the name time-test.
java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar \
--spring.cloud.stream.bindings.output.destination=time-test \
--spring.cloud.function.definition="timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction" \
--spel.function.expression="payload.length()" \
--header.enricher.headers=task-id=payload*2 \
--task.launch.request.task-name-expression="'task-'+headers['task-id']"Now, the transformed message would look like:
headers:
task-id: 34
content_type: application/json
Payload
49 bytes
Encoding: string
{"args":[],"deploymentProps":{},"name":"task-34"}Let us unpack what is happening behind the scenes. We will start with the following function definition.
timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction
-
Here, the function definition creates a composite
Supplierbeginning with the defaulttimeSupplierJava function included in this repository, which is the foundation fortime-source. -
The
spelFunctionapplies to aMessagefrom which we can extract and transform thepayloadorheaders, by following standard Spring Integration conventions. -
The output of
spelFunctionis the length of the date-time String,17. -
From here, we apply the header-enricher Java function to add a Message header,
task-idwith the value ofpayload*2. That would be34. -
We use the
task-idin the header to generate the "task name", and to programmatically derive the task launch request using the SpEL expression "'task-'+headers['task-id']", ortask-34.
This somewhat contrived example, but the goal here was to highlight the power of function composition.
If you have had a task definition in Spring Cloud Data Flow with the name task-34, you could build a time | tasklauncher
streaming data pipeline to launch that task every second.
Before the 3.0 release of Stream Applications, this composition required extensive customization. And a lot more manual
configuration changes, extensions, and custom build of the applications.
|
Note
|
Support for composite functions includes auto-configuration for conventional binding name mappings (input and output)
derived from the function definition and the presence of spring.cloud.stream.bindings.output….
|
In this example, --spring.cloud.stream.bindings.output.destination=time-test is enabled behind the scenes by the auto-configured
property
--spring.cloud.stream.function.bindings.timeSupplierspelFunctionheaderEnricherFunctiontaskLaunchRequestFunction-out-0=output.