Merge
Services in fluvio can be defined to have multiple sinks and sources. In this example, we will implement a service that takes in multiple sources via a merge. The example will simulate buying and selling stocks. There is a topic buy
and another topic sell
that aggreate to create a log of buy and sell orders. The visual shows the dataflow we will implement.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Transformation
We can add a transform operator to our source list.
source:
- type: topic
id: (...)
transforms:
- operator: map
run: |
(... filter function ...)
(... more topics ...)
In our case, we have two topics buy
and sell
that read json objects that include a name, amount, and price. We will map the json object into a string that gets sent into the topic message
which will log the orders.
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}
Running the Example
Copy and paste following config and save it as dataflow.yaml
.
# dataflow.yaml
apiVersion: 0.5.0
meta:
name: merge-example
version: 0.1.0
namespace: examples
config:
converter: json
types:
order:
type: object
properties:
name:
type: string
amount:
type: u32
price:
type: f32
topics:
buy:
schema:
value:
type: order
sell:
schema:
value:
type: order
message:
schema:
value:
type: string
services:
mergeservice:
sources:
- type: topic
id: buy
transforms:
- operator: map
run: |
fn buy_order(order: Order) -> Result<String> {
Ok(format!("+ Buy Order for {}x{} at {}",order.name,order.amount,order.price))
}
- type: topic
id: sell
transforms:
- operator: map
run: |
fn sell_order(order: Order) -> Result<String> {
Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
}
sinks:
- type: topic
id: message
To run example:
$ sdf run --ephemeral
Produce json objects to each of the topics:
$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell
Consume topic message
to retrieve the result:
$ fluvio consume message -Bd
"+ Buy Order for AMZNx20 at 173.33"
"- Sell Order for TSLAx20 at 219.41"
Both the buy order and sell order has been mapped into a string to be logged.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
In this example, we covered how to use merge to allow services to consume multiple sources.