Kotlin Flow Code Walkthrough heading-link-icon

This section describes the Kotlin code for the flows Communication between participants in an application network is peer-to-peer using flows. and supporting classes in workflows/src/main/kotlin/com/r3/developers/cordapptemplate/flowexample/workflows/MyFirstFlow.kt in the CorDapp template repo. It contains the following:

The full listing with explanatory comments is as follows:

package com.r3.developers.cordapptemplate.flowexample.workflows

import net.corda.v5.application.flows.*
import net.corda.v5.application.marshalling.JsonMarshallingService
import net.corda.v5.application.membership.MemberLookup
import net.corda.v5.application.messaging.FlowMessaging
import net.corda.v5.application.messaging.FlowSession
import net.corda.v5.base.annotations.CordaSerializable
import net.corda.v5.base.annotations.Suspendable
import net.corda.v5.base.types.MemberX500Name
import org.slf4j.LoggerFactory

// A class to hold the deserialized arguments required to start the flow
class MyFirstFlowStartArgs(val otherMember: MemberX500Name)


// A class which will contain a message, It must be marked with @CordaSerializable for Corda
// to be able to send from one virtual node to another.
@CordaSerializable
class Message(val sender: MemberX500Name, val message: String)


// MyFirstFlow is an initiating flow, it's corresponding responder flow is called MyFirstFlowResponder (defined below)
// to link the two sides of the flow together they need to have the same protocol.
@InitiatingFlow(protocol = "my-first-flow")
// MyFirstFlow should inherit from ClientStartableFlow, which tells Corda it can be started via an REST call from a client
class MyFirstFlow: ClientStartableFlow {

  // It is useful to be able to log messages from the flows for debugging.
  private companion object {
    val log = LoggerFactory.getLogger(this::class.java.enclosingClass)
  }

  // Corda has a set of injectable services which are injected into the flow at runtime.
  // Flows declare them with @CordaInjectable, then the flows have access to their services.

  // JsonMarshallingService provides a Service for manipulating json
  @CordaInject
  lateinit var jsonMarshallingService: JsonMarshallingService

  // FlowMessaging provides a service for establishing flow sessions between Virtual Nodes and
  // sending and receiving payloads between them
  @CordaInject
  lateinit var flowMessaging: FlowMessaging

  // MemberLookup provides a service for looking up information about members of the Virtual Network which
  // this CorDapp is operating in.
  @CordaInject
  lateinit var memberLookup: MemberLookup

  // When a flow is invoked it's call() method is called.
  // call() methods must be marked as @Suspendable, this allows Corda to pause mid-execution to wait
  // for a response from the other flows and services
  @Suspendable
  override fun call(requestBody: ClientRequestBody): String {

    // Useful logging to follow what's happening in the console or logs
    log.info("MFF: MyFirstFlow.call() called")

    // Show the requestBody in the logs - this can be used to help establish the format for starting a flow on corda
    log.info("MFF: requestBody: ${requestBody.getRequestBody()}")

    // Deserialize the Json requestBody into the MyfirstFlowStartArgs class using the JsonSerialisation Service
    val flowArgs = requestBody.getRequestBodyAs(jsonMarshallingService, MyFirstFlowStartArgs::class.java)

    // Obtain the MemberX500Name of counterparty
    val otherMember = flowArgs.otherMember

    // Get our identity from the MemberLookup service.
    val ourIdentity = memberLookup.myInfo().name

    // Create the message payload using the MessageClass we defined.
    val message = Message(otherMember, "Hello from $ourIdentity.")

    // Log the message to be sent.
    log.info("MFF: message.message: ${message.message}")

    // Start a flow session with the otherMember using the FlowMessaging service
    // The otherMember's Virtual Node will run the corresponding MyFirstFlowResponder responder flow
    val session = flowMessaging.initiateFlow(otherMember)

    // Send the Payload using the send method on the session to the MyFirstFlowResponder Responder flow
    session.send(message)

    // Receive a response from the Responder flow
    val response = session.receive(Message::class.java)

    // The return value of a ClientStartableFlow must always be a String, this string will be passed
    // back as the REST response when the status of the flow is queried on Corda.
    return response.message
  }
}

// MyFirstFlowResponder is a responder flow, it's corresponding initiating flow is called MyFirstFlow (defined above)
// to link the two sides of the flow together they need to have the same protocol.
@InitiatedBy(protocol = "my-first-flow")
// Responder flows must inherit from ResponderFlow
class MyFirstFlowResponder: ResponderFlow {

  // It is useful to be able to log messages from the flows for debugging.
  private companion object {
    val log = LoggerFactory.getLogger(this::class.java.enclosingClass)
  }

  // MemberLookup provides a service for looking up information about members of the Virtual Network which
  // this CorDapp is operating in.
  @CordaInject
  lateinit var memberLookup: MemberLookup


  // Responder flows are invoked when an initiating flow makes a call via a session set up with the Virtual
  // node hosting the Responder flow. When a responder flow is invoked, its call() method is called.
  // call() methods must be marked as @Suspendable, this allows Corda to pause mid-execution to wait
  // for a response from the other flows and services/
  // The Call method has the flow session passed in as a parameter by Corda so the session is available to
  // responder flow code, you don't need to inject the FlowMessaging service.
  @Suspendable
  override fun call(session: FlowSession) {

    // Useful logging to follow what's happening in the console or logs
    log.info("MFF: MyFirstResponderFlow.call() called")


    // Receive the payload and deserialize it into a Message class
    val receivedMessage = session.receive(Message::class.java)

    // Log the message as a proxy for performing some useful operation on it.
    log.info("MFF: Message received from ${receivedMessage.sender}: ${receivedMessage.message} ")

    // Get our identity from the MemberLookup service.
    val ourIdentity = memberLookup.myInfo().name

    // Create a response to greet the sender
    val response = Message(ourIdentity,
      "Hello ${session.counterparty.commonName}, best wishes from ${ourIdentity.commonName}")

    // Log the response to be sent.
    log.info("MFF: response.message: ${response.message}")

    // Send the response via the send method on the flow session
    session.send(response)
  }
}
/*
RequestBody for triggering the flow via REST:
{
    "clientRequestId": "r1",
    "flowClassName": "com.r3.developers.cordapptemplate.flowexample.workflows.MyFirstFlow",
    "requestBody": {
        "otherMember":"CN=Bob, OU=Test Dept, O=R3, L=London, C=GB"
        }
}
 */

There are two helper classes:

  • MyFirstFlowStartArgs — provides a wrapper around the single arguments that need to be passed into the flow — the other member of the application network The set of all possible entities onboarded according to the rules of the network. Application networks enable people and businesses to interact with each other on Corda. who the message should be sent to:
    class MyFirstFlowStartArgs(val otherMember: MemberX500Name)
    
  • Message — specifies the sender and the message. This is used for both the message sent from the initiator to the responder and subsequently the message sent back from the responder to the initiator. Note, as this is a class defined in a CorDapp Corda Distributed Application. A Java (or any JVM targeting language) application built using the Corda build toolchain and CorDapp API to solve some problem that is best solved in a decentralized manner. and it is going to be sent ‘down the wire’ between two virtual nodes, it requires the @CordaSerializable annotation.
    @CordaSerializable
    class Message(val sender: MemberX500Name, val message: String)
    

To trigger a flow from REST, the flow must inherit from ClientStartableFlow. Most flows will come in pairs; one initiating flow and a corresponding responder flow. The responder flow must inherit from ResponderFlow. The two flows are linked by adding the @InitiatingFlow and @InitiatedBy annotations which both specify the same protocol in this case “my-first-flow”:

@InitiatingFlow(protocol = "my-first-flow")
class MyFirstFlow: ClientStartableFlow { ... }
@InitiatedBy(protocol = "my-first-flow")
class MyFirstFlowResponder: ResponderFlow { ... }

It is useful to add logging statements to both the initiating and responder flows. To do this, add a logger to each class. When running tests locally, the console displays log entries. When running on Corda, the log files are updated.

    private companion object {
        val log = LoggerFactory.getLogger(this::class.java.enclosingClass)
    }

The log files for the CorDapp template are located in the logs folder in the root of the project. The CorDapp template starts a new log file each time a new instance of Corda is created or when the log file grows too large. The logging can be configured by editing the config/log4j2.xml file. Because the Corda combined worker runs all of the Corda processes in one JVM process, there are a lot of log entries. We recommend adding an easily searchable tag to each log message. For example:

        log.info("MFF: MyFirstFlow.call() called")

Each flow has a call() method. This is the method which Corda invokes when the flow is invoked.

When a flow is started via REST, the requestBody from the HTTP request is passed into the call method as the requestBody parameter, giving the rest of the call method access to the parameters passed in via HTTP.

When a responder flow is invoked as a result of an initiator flow on another node, the flow session with the initiating node is passed in as the parameter session.

The call() method in both flows must be marked as @Suspendable. This is an indicator to Corda that this method can be paused and persisted to the database whilst the flow waits for an asynchronous response. It will be rehydrated and continue to execute once the appropriate response is received. It is this mechanism that allows a CorDapp Developer to write what appears to be synchronous, blocking code that executes asynchronously and does not block the Corda cluster.

In the initiating flow:

    @Suspendable
    override fun call(requestBody: ClientRequestBody): String { ... }

In the responder flow:

    @Suspendable
    override fun call(session: FlowSession) { ... }

Corda 5 requires a CorDapp Developer to explicitly specify which Corda services are required by the flow. In this simple example we use three services:

  • JsonMarshallingService — a service that CorDapps and other services may use to marshal arbitrary content in and out of JSON format using standard approved mappers.
  • FlowMessaging — a service that CorDapps can use to create communication sessions between two virtual nodes. Once set up, you can send and receive using the session object.
  • MemberLookup — a service that CorDapps can use to retrieve information about virtual nodes on the network.

There are other services, such as the Persistence and Serialization services, which are beyond the scope of this Getting Started example.

Services are declared as properties in the flow class with the @CordaInject annotation:

class MyFirstFlow: ClientStartableFlow {

    ...

    @CordaInject
    lateinit var jsonMarshallingService: JsonMarshallingService

    @CordaInject
    lateinit var flowMessaging: FlowMessaging

    @CordaInject
    lateinit var memberLookup: MemberLookup

    ...

    }

The services are then available in the call function. For example to initiate a FlowSession with the other party:

    val session = flowMessaging.initiateFlow(otherMember)

The first thing that the MyFirstFlow.call() method does is convert the requestBody parameters into a Kotlin class. It does this using the getRequestBodyAs() method. This takes the jsonMarshallingService and the class that the requestBody parameters should be parsed into. The flowArgs variable has the type MyFirstFlowStartArgs, the helper class we declared in the Helper classes section.

@Suspendable
    override fun call(requestBody: ClientRequestBody): String {

        ...

        val flowArgs = requestBody.getRequestBodyAs(jsonMarshallingService, MyFirstFlowStartArgs::class.java)

        ...

        }

We can now obtain the X500 name of the other virtual node that we want to send the message to from flowArgs, which we will need later to set up the FlowSession:

        val otherMember = flowArgs.otherMember

To create the message, you must know the identity of the initiator. Remember that this flow could run on any node, so the identity cannot be hard coded. To find out the identity of the virtual node running the initiator flow, use the injected memberLookup service:

        val otherMember = flowArgs.otherMember
        val ourIdentity = memberLookup.myInfo().name
        val message = Message(otherMember, "Hello from $ourIdentity.")
        log.info("MFF: message.message: ${message.message}")

We can now start sending messages to the responder:

  1. Set up a FlowSession between the initiator and responder node:

         val session = flowMessaging.initiateFlow(otherMember)
    
  2. Simply call send() on the session and pass a payload:

         session.send(message)
    

    The code continues to execute until it reaches the session.receive() method. At that point, the flow checkpoints and persists its state An immutable object representing a fact known by one or more participants at a specific point in time. You can use states to represent any type of data, and any kind of fact. to the database. It resumes when it receives a message back from the responder. This frees up the Corda cluster flow workers A worker that runs the CorDapp application code and translates flow API calls into function requests to the relevant workers. The flow workers are designed to share work between themselves and to record checkpoints at each stage of the application's progress, so that in the event of worker failure, the operations can be retried. to perform other tasks.

            val response = session.receive(Message::class.java)
    

    When the message is sent to the responder via the send() method, Corda executes session.receive() in the MyFirstResponderFlow responder flow’s call() method. This session.receive() method retrieves the message that was sent in the initiator’s send() method.

        @Suspendable
        override fun call(session: FlowSession) {
    
            log.info("MFF: MyFirstResponderFlow.call() called")
    
            val receivedMessage = session.receive(Message::class.java)
    

    The responder flow then obtains its own identity from its injected memberLookup service, creates a response, and sends the response back using session.send():

         val ourIdentity = memberLookup.myInfo().name
    
         val response = Message(ourIdentity,
             "Hello ${session.counterparty.commonName}, best wishes from ${ourIdentity.commonName}")
    
         log.info("MFF: response.message: ${response.message}")
    
         session.send(response)
    

    There is no return value for a ResponderFlow.

    Back on the Initiating virtual node, Corda detects the send coming from the responder and rehydrates the initiating flow from the database and resumes it from where it was checkpointed.

    The response is received and response.message is returned by the flow.

         val response = session.receive(Message::class.java)
    
         return response.message
    

    The response from the initiating flow is always a string, which can be returned when the flow status is queried by REST.

It is important that the sends and receives in the initiator and responder flows match. If the initiator sends a Foo and the responder expects a Bar, the flow hangs and likely results in a timeout error.

The sendAndReceive method on FlowSession sends a payload, check-points the flow, and then waits for a response to be received:

val response = myFlowSession.sendAndReceive(<ReceiveType>::class.java, payload)

In Corda 4, when payloads were received they were wrapped in an UntrustworthyData class which required unwrapping:

// (Corda 4)
val Corda4Response = myFlowSession.sendAndReceive(<ReceiveType>::class.java, payload).unwrap {<validationcode>}

This has been removed in Corda 5 because CorDapp Developers usually use other methods to validate the data.

Was this page helpful?

Thanks for your feedback!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.

We're sorry this page wasn't helpful. Let us know how we can make it better!

Chat with us

Chat with us on our #docs channel on slack. You can also join a lot of other slack channels there and have access to 1-on-1 communication with members of the R3 team and the online community.

Create an issue

Create a new GitHub issue in this repository - submit technical feedback, draw attention to a potential documentation bug, or share ideas for improvement and general feedback.

Propose documentation improvements directly

Help us to improve the docs by contributing directly. It's simple - just fork this repository and raise a PR of your own - R3's Technical Writers will review it and apply the relevant suggestions.