Streaming data out of a Corda node with Spring WebFlux

July 25, 2018

cordakotlindltblockchaindistributed ledger technology

It’s been a while since my last post but I’m finally back! Since I am still on my project, I will be writing about using Corda again. This time, rather than focusing on Corda, we’ll look at using Spring with Corda. More specifically, Spring WebFlux. Why do this? One, because we can. Two, because it allows us to stream events coming out of the Corda node. This provides us with the possibility to track the progress of flows or retrieve updates to the vault and send them to any clients registered to the relevant endpoints. Using WebFlux with Corda did introduce a few problems. Some originating from Corda and some from Spring. Although, the Spring issues were to do with me expecting the Spring Boot + WebFlux combo to do more by default for me.

In this post, I’m going to assume you have some experience with Corda but if you do need some extra information on the subject I recommend reading through my previous posts: What is Corda and Developing with Corda. Furthermore, I also suggest taking a look at Doing stuff with Spring WebFlux as an introduction to WebFlux.

The 3.2 Open Source version of Corda will be used for the contents of this tutorial. I actually started writing this post based on 3.1 but the newer version was released during this time. Due to this, there are a few comments based on moving between these versions.

We will also be implementing everything in Kotlin but the contents of this post can be implemented in Java as well.

Introduction to the example application

We will be modelling a really simple application that doesn’t provide much use and is something I botched together for the sake of this post. The application will consist of one party sending a message (represented by the MessageState) to another party. To do this the SendMessageFlow will run and once it does, both parties will have a copy of the message and that’s it. Short and simple but should provide us with enough to demonstrate how WebFlux can work with Corda.

Structure

Normally I start by looking at the dependencies. Although, since I have split the code into separate modules, it would be best to first view the structure of the little example application.

.
+-- app
|   +-- {spring code}
|   +-- build.gradle
+-- cordapp
|   +-- {flow code}
|   +-- build.gradle
+-- contracts-and-states
|   +-- {contracts and states code}
|   +-- build.gradle
+-- build.gradle

That’s a quick view of the structure of the application. app will contain all the Spring code and will delegate to the Corda node via RPC. The cordapp module houses the flow logic and contracts-and-states does what the name suggests and contains the contract and state code. Both the cordapp and contracts-and-states modules are packaged up into Cordapp Jars and dumped into the Corda node.

Each of these modules contains a build.gradle file containing its relevant build information and dependencies. Since this post is not directly focusing on writing Corda code, we will not go on and look through every module and their build files in detail. Instead, we will only brush over the flow code at the end of the post so we can focus on the Spring implementation.

Dependencies for Spring module

Below is the build.gradle file of the app module (containing the Spring code):

buildscript {
    ext.spring_boot_version = '2.0.3.RELEASE'

    repositories {
        mavenCentral()
    }

    dependencies {
        classpath "org.jetbrains.kotlin:kotlin-allopen:$kotlin_version"
    }
}

// needed to resolve dependency error
// Constructor threw exception; nested exception is java.lang.NoSuchMethodError: io.netty.util.AsciiString.cached(Ljava/lang/String;)Lio/netty/util/AsciiString;
configurations.all {
    resolutionStrategy {
        force 'io.netty:netty-all:4.1.25.Final'
    }
}

repositories {
    mavenLocal()
    jcenter()
    mavenCentral()
    maven { url 'https://ci-artifactory.corda.r3cev.com/artifactory/corda-releases' }
}

apply plugin: 'kotlin'
apply plugin: 'net.corda.plugins.cordapp'
apply plugin: "kotlin-spring"

dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
    testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
    testCompile "junit:junit:$junit_version"

    cordaCompile "$corda_release_group:corda-core:$corda_release_version"
    cordaCompile "$corda_release_group:corda-jackson:$corda_release_version"
    cordaCompile "$corda_release_group:corda-rpc:$corda_release_version"
    cordaRuntime "$corda_release_group:corda:$corda_release_version"

    compile group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: "$spring_boot_version"

    // added since rxjava 1.xx does not implement publisher so cant be used by Spring WebFlux
    compile group: 'io.reactivex', name: 'rxjava-reactive-streams', version: '1.2.1'

    cordapp project(":cordapp-contracts-states")
    cordapp project(":cordapp")
}
// tasks to run web servers

I’m not an expert in Gradle, so there are probably some things in this snippet that could be done better, but it does what it needs to.

So, there are a few things I want to highlight. Spring Boot 2.0.3.RELEASE is being used and to go along with this the kotlin-spring plugin is used to add open to all Kotlin classes marked with certain Spring annotations. This is needed for quite a lot of situations since Spring requires some classes to be non-final. This isn’t a problem in Java but is problematic for Kotlin since all classes are final by default. More information on the plugin can be found at kotlinlang.org.

spring-boot-starter-webflux pulls in the WebFlux dependencies along with general Spring web server code to get everything up and running.

rxjava-reactive-streams, this is an interesting one that we will see come into play later on. Since Corda uses RxJava 1.x.x rather than the newer RxJava2, its Observables do not implement the Java 8 Publisher interface that Spring WebFlux uses to return reactive streams. This dependency converts these older Observables into Publishers so they are compatible with WebFlux. We will touch on this again later when we look at the code to do this conversion.

Finally, the netty-all version is forced to 4.1.25.Final to resolve a dependency issue.

Routing functions

WebFlux introduces a functional approach for routing requests to the functions that handle them. More information on this can be found in Doing stuff with Spring WebFlux. I don’t want to jump deep into how WebFlux is working but we will take a quick look at defining the routing functions. The main reason for this is due to using Kotlin instead of Java. Kotlin provides a different way to define the functions by using a DSL.

Below is the code to define the routing for this tutorial:

@Configuration
class MessageRouter {
    @Bean
    fun routes(handler: MessageHandler): RouterFunction<ServerResponse> = router {
        ("/messages").nest {
            (accept(MediaType.TEXT_EVENT_STREAM) and contentType(MediaType.APPLICATION_JSON)).nest {
                POST("/", handler::post)
            }
            accept(MediaType.APPLICATION_STREAM_JSON).nest {
                GET("/updates", handler::updates)
            }
        }
    }
}

The routes bean takes in the MessageHandler bean (which we will look at later) and maps two URI’s to functions found in that MessageHandler. The DSL allows for a slightly shorter version compared to the Java implementation. There are a few parts to focus on in this snippet.

("/messages") defines the base request path of the two routing functions. The DSL allows the functions to nest themselves from this base path and helps with the conveying the structure of the routes.

One function accepts TEXT_EVENT_STREAM (text/event-stream) in the response returned from sending the request while also specifying APPLICATION_JSON (application/stream+json) as the contents of the body. Since we have defined the Content-Type, in most cases we can assume that we will be sending a POST request (which we are). POST is further nested from the previous configuration and adds another MessageHandler function to accept requests.

The second function receives updates from the Corda node. To do this it returns APPLICATION_STREAM_JSON and expects a GET request to be sent to /messages/updates.

Handler functions

In this section, we will look at the MessageHandler that was mentioned a few times in the previous section. This class contains all the functions that perform the actual business logic. The routing was just a means to reach this point.

My previous post, Doing stuff with Spring WebFlux will explain the more WebFlux specific parts of these examples in more depth than I will in this post.

Below is the handler code:

@Component
class MessageHandler(rpc: NodeRPCConnection) {

    private val proxy: CordaRPCOps = rpc.proxy

    fun updates(request: ServerRequest): Mono<ServerResponse> {
        return ok().contentType(APPLICATION_STREAM_JSON)
            .body(trackNewMessages(), ParameterizedTypeReference.forType(Vault.Update::class.java))
    }

    private fun trackNewMessages(): Publisher<Vault.Update<MessageState>> =
        toPublisher(proxy.vaultTrackBy<MessageState>().updates)

    fun post(request: ServerRequest): Mono<ServerResponse> {
        val message = request.bodyToMono(Message::class.java)
        val id = UUID.randomUUID()
        return message.flatMap {
            created(UriComponentsBuilder.fromPath("messages/$id").build().toUri())
                .contentType(TEXT_EVENT_STREAM)
                .body(startTrackedFlow(it, id), String::class.java)
        }
    }

    private fun startTrackedFlow(message: Message, id: UUID): Publisher<String> =
        toPublisher(
            proxy.startTrackedFlow(
                ::SendMessageFlow, state(message, id)
            ).progress
        )

    private fun state(message: Message, id: UUID) =
        MessageState(
            sender = proxy.nodeInfo().legalIdentities.first(),
            recipient = parse(message.recipient),
            contents = message.contents,
            linearId = UniqueIdentifier(id.toString())
        )

    private fun parse(party: String) = proxy.wellKnownPartyFromX500Name(CordaX500Name.parse(party))
            ?: throw IllegalArgumentException("Unknown party name.")
}

First, we should highlight the NodeRPCConnection class and it’s property proxy of type CordaRPCOps. I stole NodeRPCConnection from an example Corda and Spring application. Long story short, NodeRPCConnection creates the RPC connection to the Corda node and proxy returns a CordaRPCOps. CordaRPCOps contains all the RPC operations that are available to use. This is the way that Spring will interact with the Corda node.

Let take a closer look at the updates function:

fun updates(request: ServerRequest): Mono<ServerResponse> {
    return ok().contentType(APPLICATION_STREAM_JSON)
        .body(trackNewMessages(), ParameterizedTypeReference.forType(Vault.Update::class.java))
}

private fun trackNewMessages(): Publisher<Vault.Update<MessageState>> =
    toPublisher(proxy.vaultTrackBy<MessageState>().updates)

This function returns new messages as they are saved to the vault. This sort of endpoint would be nice if you had an application that monitored updates coming from your Corda node.

The Corda related code in this snippet is all contained within the trackNewMessages function. It uses CordaRPCOps’s vaultTrackBy to access the vault service and starts tracking updates to any MessageStates. Since we have not passed any arguments to the function it will be tracking UNCONSUMED states only. vaultTrackBy returns a DataFeed object that can be used to either retrieve a snapshot of the vault via the snapshot property or by accessing the updates property an Observable will be returned allowing it’s update events to be subscribed to. This RxJava Observable is what we will use to stream data back to the caller.

This is the first instance where we need to use the rxjava-reactive-streams that I mentioned earlier. The toPublisher method takes in an Observable and converts it into a Publisher. Remember, WebFlux requires Java 8 compatible reactive streaming libraries that must implement Publisher. For example, Spring tends to make use of Reactor which provides the Mono and Flux classes.

After creating the Publisher it needs to be fed into a ServerResponse. As everything has gone well at this point we will return a 200 response via the ok method. The Content-Type is then set to APPLICATION_STREAM_JSON since it contains streaming data. Finally, the body of the response takes in the Publisher from trackNewMessages. The endpoint is now ready to be subscribed to by a requesting client.

The functionality to stream updates from the node to a client is now complete. What about actually saving a new message? Furthermore, is there any information that we can pass back to the sender about the executing flow? So let’s answer those two questions. Yes, we can save a new message using WebFlux. And yes, a flow can return its current progress.

Below is the code for the post function that saves a new message to both the sender’s and the recipient’s nodes while streaming the flow’s progress:

fun post(request: ServerRequest): Mono<ServerResponse> {
    val message = request.bodyToMono(Message::class.java)
    val id = UUID.randomUUID()
    return message.flatMap {
        created(UriComponentsBuilder.fromPath("messages/$id").build().toUri())
            .contentType(TEXT_EVENT_STREAM)
            .body(startTrackedFlow(it, id), String::class.java)
    }
}

private fun startTrackedFlow(message: Message, id: UUID): Publisher<String> =
    toPublisher(
        proxy.startTrackedFlow(
            ::SendMessageFlow, state(message, id)
        ).progress
    )

private fun state(message: Message, id: UUID) =
    MessageState(
        sender = proxy.nodeInfo().legalIdentities.first(),
        recipient = parse(message.recipient),
        contents = message.contents,
        linearId = UniqueIdentifier(id.toString())
    )

private fun parse(party: String) = proxy.wellKnownPartyFromX500Name(CordaX500Name.parse(party))
        ?: throw IllegalArgumentException("Unknown party name.")

proxy.startTrackedFlow starts a flow whose progress can be tracked by any ProgressTrackers added to the flow. The startTrackedFlow defined in this class delegates to the aforementioned function and returns its progress property; an Observable&lt;String&gt; whose events consist of the ProgressTracker’s progress.

The MessageState that’s passed into the flow is created from the Message object passed in from the request. This is to allow easier input of the message data to the endpoint since it contains less information than the MessageState itself. parse converts the string X500 name passed in the Message into a CordaX500Name and then into a Party within the network, assuming one exists.

This is then packaged into a response via the created method. The Content-Type is specified to tell the client that it contains text/event-stream. The path to the message uses the UUID that was created before the flow was executed. This could, for example, be used to retrieve a specific message but you’ll need to implement that yourself since I am too lazy to do that for this post.

Creating a client

Now that the endpoints are set up we should create a client that can send requests and consume the streams sent back to it. Later on, we will briefly look at the flow code to get a fuller understanding of whats going on.

To send requests to a reactive back-end, Spring WebFlux provides the WebClient class. After sending a request, the WebClient can react to each event sent in the response. The MessageClient below does just that:

@Component
class MessageClient(
    @Value("\${server.host}") private val host: String,
    @Value("\${server.port}") private val port: Int,
    private val decoder: Jackson2JsonDecoder
) {

    private val strategies = ExchangeStrategies
        .builder()
        .codecs { clientCodecConfigurer ->
            clientCodecConfigurer.defaultCodecs().jackson2JsonDecoder(decoder)
        }.build()

    private val client = WebClient.builder()
        .exchangeStrategies(strategies)
        .baseUrl("http://$host:$port")
        .build()

    fun doStuff() {
        val message = Message("O=PartyB,L=London,C=GB", "hello there")
        client
            .post()
            .uri("/messages")
            .body(Mono.just(message), Message::class.java)
            .accept(TEXT_EVENT_STREAM)
            .exchange()
            .flatMapMany { it.bodyToFlux(String::class.java) }
            .subscribe { println("STEP: $it") }

        client
            .get()
            .uri("/messages/updates")
            .accept(APPLICATION_STREAM_JSON)
            .exchange()
            .flatMapMany { it.bodyToFlux(Vault.Update::class.java) }
            .subscribe { println("UPDATE: $it") }

    }
}

The MessageClient wraps and uses a WebClient to send requests to the address specified in the WebClient’s builder. There is some extra configuration going on in this class around deserialisation, but I want to brush over that for now as there is a section further down covering that topic.

As before Doing stuff with Spring WebFlux provides in-depth explanations into the WebFlux specific methods.

So let’s look at each request individually, first up the POST request to the /messages endpoint:

client
    .post()
    .uri("/messages")
    .body(Mono.just(message), Message::class.java)
    .accept(TEXT_EVENT_STREAM)
    .exchange()
    .flatMapMany { it.bodyToFlux(String::class.java) }
    .subscribe { println("STEP: $it") }

The post method creates a builder that specifies the contents of the request. This should match up to an endpoint that we defined earlier. Once the request has been built, call the exchange method to send it to the server. The body of the response is then mapped to a Flux&lt;String&gt; allowing it to be subscribed to. That is the essence of using Reactive Streams. Once subscribing to the response, it is up to the client to perform whatever processing they wish to do on each event. In this scenario, it simply prints out the current step of the ProgressTracker.

If we sent a request via this piece of code we would receive the following:

STEP: Verifying
STEP: Signing
STEP: Sending to Counterparty
STEP: Collecting signatures from counterparties.
STEP: Verifying collected signatures.
STEP: Done
STEP: Finalising
STEP: Requesting signature by notary service
STEP: Broadcasting transaction to participants
STEP: Done
STEP: Done

These are the steps that the SendMessageFlow’s ProgressTracker defines. Yes, I know I haven’t shown you that code yet but just trust me on this. Not much else to this one really. As you can see, each string value returned from the stream attaches “STEP” to itself

Now onto the GET request to the /messages/update endpoint:

client
    .get()
    .uri("/messages/updates")
    .accept(APPLICATION_STREAM_JSON)
    .exchange()
    .flatMapMany { it.bodyToFlux(Vault.Update::class.java) }
    .subscribe { println("UPDATE: $it") }

Again there isn’t much to show at this point. But, behind the scenes, there is actually quite a bit of work required to get this to work. All the issues that I faced to get this call to work all revolved around serialisation and deserialisation. We will get into that in the next section.

The response to this request is as follows:

UPDATE: 0 consumed, 1 produced

Consumed:

Produced:
56781DF3CEBF2CDAFACE1C5BF04D4962B5483FBCD2C2E428352AD82BC951C686(0)
: TransactionState(data=MessageState(sender=O=PartyA, L=London, C=GB, 
recipient=O=PartyB, L=London, C=GB, contents=hello there, 
linearId=1afc6144-32b1-4265-a06e-73b6bb81aef3_b0fa8491-c9b9-418c-ba6e-8b7840faaf30, 
participants=[O=PartyA, L=London, C=GB, O=PartyB, L=London, C=GB]), 
contract=com.lankydanblog.tutorial.contracts.MessageContract, 
notary=O=Notary, L=London, C=GB, encumbrance=null, 
constraint=net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint@4a1febb5)

The nice thing about this endpoint is that it now maintains a connection to the node which will keep sending any related updates back to this client. The above request was the update for the original POST message. Any new events received by the client would output an update on the client. This is what makes this sort of endpoint ideal for triggering a process or simply displaying up to date data on a front-end separate from the Corda node itself.

Serialisation and Deserialisation

In this section, I wanted to focus on setting up serialisation and deserialisation correctly. The data retrieved from the /messages/updates endpoint needs to serialise its data correctly to pass to the client, who also needs to be able to deserialise the response data.

Normally Spring does a lot of this for you, and it still does, but it seems with WebFlux there are some extra steps required to get it set up properly. Disclaimer, this is from my experience and if you know of better ways to do this I would be interested to hear from you.

Corda JacksonSupport

Spring tends to use Jackson by default and, very handily, Corda provides a lot of Jackson setup itself. The JacksonSupport.cordaModule provides some serialisation and deserialisation for classes such as Party and CordaX500Name. If you have some basic situations where you need to serialise or deserialise a Corda class this will probably suit your needs. In Spring you could create a bean that the default ObjectMapper will retrieve and add to itself.

@Bean
fun cordaModule() = JacksonSupport.cordaModule

But, this route has a few caveats. Some classes cannot be deserialised since the module relies on the ObjectMapper having access to node information, for example via the RPC client CordaRPCOps. Without this, deserialising a Party, AbstractParty or AnonymousParty will fail. Not only that, but this has now been deprecated from Corda 3.2 due to not being thread safe. JacksonSupport.cordaModule has also been moved into its own class (CordaModule).

The solution I give below is also the solution that Corda recommends taking from now on.

Below is the exception thrown when the MessageClient retrieves updates from the /messages/updates endpoint (for the rest of this section the same endpoint will be used):

com.fasterxml.jackson.databind.ObjectMapper cannot be cast to net.corda.client.jackson.JacksonSupport$PartyObjectMapper

From this, we can determine that our ObjectMapper is of the wrong type and actually needs to be the subtype PartyObjectMapper. Taking this a bit further we can see that this mapper is found in the JacksonSupport class as well. Now, all there is left to do is to create this mapper and use that instead of the default ObjectMapper.

So let’s see how to do that:

@Bean
fun rpcObjectMapper(rpc: NodeRPCConnection): ObjectMapper {
    return JacksonSupport.createDefaultMapper(rpc.proxy)
}

This will create a RpcObjectMapper which implements PartyObjectMapper and makes use of RPC to retrieve node information to make it possible to deserialise the various party classes. Inside the createDefaultMapper, the CordaModule from before is added and thanks to Spring, this will now be the default object mapper for most (note the most for later) instances where serialisation or deserialisation is needed.

Some more serialisation and deserialisation configuration

Now… I’m actually in quite a weird position. I wanted to go through all the other steps to get the endpoint working. But, no matter what I do, I cannot seem to recreate all the errors I used to run into before getting it to work. I don’t know what to say… Somewhere my exceptions are being swallowed and stopping me from seeing what is going on. Anyway, we must continue on. Thankfully I know why I added the rest of the code but I can no longer provide you with the exception that each change fixed…

Soooo, let’s look at the end product of the rpcObjectMapper that we started working on earlier:

@Bean
fun rpcObjectMapper(rpc: NodeRPCConnection): ObjectMapper {
    val mapper = JacksonSupport.createDefaultMapper(rpc.proxy)
    mapper.registerModule(jsonComponentModule())
    mapper.registerModule(MixinModule())
    return mapper
}

@Bean
fun jsonComponentModule() = JsonComponentModule()

class MixinModule : SimpleModule() {
    init {
        setMixInAnnotation(Vault.Update::class.java, VaultUpdateMixin::class.java)
        setMixInAnnotation(SecureHash::class.java, SecureHashMixin::class.java)
    }
}

abstract class VaultUpdateMixin {
    @JsonIgnore
    abstract fun isEmpty()
}

@JsonDeserialize(using = JacksonSupport.SecureHashDeserializer::class)
abstract class SecureHashMixin

There are a few additions here. The JsonComponentModule is added as a bean so that it picks up the defined @JsonSerializer and @JsonDeserializer custom components (in other classes). It seems that even if it is added to the mapper as a module, it still requires the bean itself to be created if it is going to find and register the custom JSON components.

Next is the MixinModule. This class resolves issues that arise when deserialising Vault.Update and SecureHash. Let’s take a closer look.

A Mixin allows us to add Jackson annotations onto a class without actually having access to the class itself which we obviously don’t control since this an object from within Corda’s codebase. The other option is that this is added to the CordaModule we discussed earlier but that is a different conversation.

Vault.Update needs this due to having a method called isEmpty, which doesn’t play nicely with Jackson who gets confused and thinks that isEmpty matches to a boolean field called empty. So when deserialising the JSON back into an object it tries to pass in a value for the field.

The MixinModule itself is simply a class whose constructor adds the VaultUpdateMixin and SecureHashMixin to itself. The mapper then adds the module just like any other module. Job done.

The Jackson annotation added to the VaultUpdateMixin was @JsonIgnore, which speaks for itself. When serialising or deserialising the isEmpty function will be ignored.

Next up is the SecureHashMixin:

@JsonDeserialize(using = JacksonSupport.SecureHashDeserializer::class)
abstract class SecureHashMixin

I have added this in after moving from 3.1 to 3.2. To me it looks like adding a Mixin for SecureHash has been forgotten. The CordaModule includes serialisation and deserialisation for SecureHash.SHA256 but not SecureHash. The above code is copy and paste from CordaModule with a different class being tied to the Mixin.

Once this is included, the differences between 3.1 and 3.2 will be resolved.

I think I’ll raise an issue for this!

Custom Serialisers and Deserialisers

To serialise Vault.Update only the AttachmentConstraint interface needs it’s own custom serialiser:

@JsonComponent
class AttachmentConstraintSerialiser : JsonSerializer<AttachmentConstraint>() {
    override fun serialize(value: AttachmentConstraint, generator: JsonGenerator, provider: SerializerProvider) {
        when (value) {
            is AlwaysAcceptAttachmentConstraint -> {
                generator.writeStartObject()
                generator.writeStringField("type", "AlwaysAcceptAttachmentConstraint")
                generator.writeEndObject()
            }
            is HashAttachmentConstraint -> {
                generator.writeStartObject()
                generator.writeStringField("type", "HashAttachmentConstraint")
                provider.defaultSerializeField("attachmentId", value, generator)
                generator.writeEndObject()
            }
            is WhitelistedByZoneAttachmentConstraint -> {
                generator.writeStartObject()
                generator.writeStringField("type", "WhitelistedByZoneAttachmentConstraint")
                generator.writeEndObject()
            }
            else -> {
                generator.writeStartObject()
                generator.writeStringField("type", "AutomaticHashConstraint")
                generator.writeEndObject()
            }
        }
    }
}

Not much to talk about since only the HashAttachmentConstraint actually has any fields. This matches up to the deserialiser later on which reads the type JSON field to determine which object is created.

The last two classes that need custom deserialisers are ContractState and AttachmentContract (matching the serialiser from before):

@JsonComponent
class ContractStateDeserialiser : JsonDeserializer<ContractState>() {
    override fun deserialize(p: JsonParser, ctxt: DeserializationContext): ContractState {
        return ctxt.readValue(p, MessageState::class.java)
    }
}

@JsonComponent
class AttachmentConstraintDeserialiser : JsonDeserializer<AttachmentConstraint>() {
    override fun deserialize(p: JsonParser, ctxt: DeserializationContext): AttachmentConstraint? {
        val tree = p.readValueAsTree<JsonNode>()
        val type = tree["type"]
        return when (type.asText()) {
            "AlwaysAcceptAttachmentConstraint" -> AlwaysAcceptAttachmentConstraint
            "HashAttachmentConstraint" -> HashAttachmentConstraint(
                SecureHash.parse(tree["attachmentId"].asText())
            )
            "WhitelistedByZoneAttachmentConstraint" -> WhitelistedByZoneAttachmentConstraint
            else -> AutomaticHashConstraint
        }
    }
}

The ContractStateDeserialiser is a pretty lazy implementation since only one state is being used in this tutorial. The AttachmentConstraintDeserialiser uses the type field defined in the serialiser to determine which implementation of AttachmentConstraint it should be converted into.

WebFlux specific configuration

This subsection goes over the extra required configuration due to using WebFlux. You have already seen some of the configuration within the MessageClient but there is a bit extra that needs to be done:

@Bean
fun decoder(rpcObjectMapper: ObjectMapper): Jackson2JsonDecoder =
    Jackson2JsonDecoder(rpcObjectMapper, MediaType.APPLICATION_JSON, MediaType.APPLICATION_STREAM_JSON)

The client needs this bean to be able to deserialise application/stream+json along with the objects returned in the response.

@Component
class MessageClient(
    @Value("\${server.host}") private val host: String,
    @Value("\${server.port}") private val port: Int,
    private val decoder: Jackson2JsonDecoder
) {

    private val strategies = ExchangeStrategies
        .builder()
        .codecs { clientCodecConfigurer ->
            clientCodecConfigurer.defaultCodecs().jackson2JsonDecoder(decoder)
        }.build()

    private val client = WebClient.builder()
        .exchangeStrategies(strategies)
        .baseUrl("http://$host:$port")
        .build()

    // do stuff
}

To make use of the Jackson2JsonDecoder defined in the configuration, the ExchangeStrategies of the WebClient must be specified. Unfortunately, the ExchangeStrategies class is not written to pick up the Jackson2JsonDecoder that we already created. I was hoping that this sort of configuration would work by default, but oh well. To add the ExchangeStrategies the WebClient builder must be used. Once that is done, we are finally there. All serialisation to package up the response and the deserialisation to use it from the client is complete.

That sums up all the Spring related code that I wish to go over in this post.

A quick look at the Flow code

Before concluding, I will briefly show the flow that I put together for the purpose of this tutorial:

@InitiatingFlow
@StartableByRPC
class SendMessageFlow(private val message: MessageState) : FlowLogic<SignedTransaction>() {

    private companion object {
        object CREATING : ProgressTracker.Step("Creating")
        object VERIFYING : ProgressTracker.Step("Verifying")
        object SIGNING : ProgressTracker.Step("Signing")
        object COUNTERPARTY : ProgressTracker.Step("Sending to Counterparty")
        object FINALISING : ProgressTracker.Step("Finalising")

        private fun tracker() = ProgressTracker(
            CREATING,
            VERIFYING,
            SIGNING,
            COUNTERPARTY,
            FINALISING
        )
    }

    override val progressTracker = tracker()

    @Suspendable
    override fun call(): SignedTransaction {
        val stx = collectSignature(verifyAndSign(transaction()))
        progressTracker.currentStep = FINALISING
        return subFlow(FinalityFlow(stx))
    }

    @Suspendable
    private fun collectSignature(
        transaction: SignedTransaction
    ): SignedTransaction {
        val session = initiateFlow(message.recipient)
        progressTracker.currentStep = COUNTERPARTY
        return subFlow(CollectSignaturesFlow(transaction, listOf(session)))
    }

    private fun verifyAndSign(transaction: TransactionBuilder): SignedTransaction {
        progressTracker.currentStep = VERIFYING
        transaction.verify(serviceHub)
        progressTracker.currentStep = SIGNING
        return serviceHub.signInitialTransaction(transaction)
    }

    private fun transaction() =
        TransactionBuilder(notary()).apply {
            progressTracker.currentStep = CREATING
            addOutputState(message, MessageContract.CONTRACT_ID)
            addCommand(
                Command(
                    MessageContract.Commands.Send(),
                    message.participants.map(Party::owningKey)
                )
            )
        }

    private fun notary() = serviceHub.networkMapCache.notaryIdentities.first()
}

@InitiatedBy(SendMessageFlow::class)
class SendMessageResponder(val session: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        subFlow(object : SignTransactionFlow(session) {
            override fun checkTransaction(stx: SignedTransaction) {}
        })
    }
}

It’s a pretty simple flow with the addition of a ProgressTracker that the /messages request used to follow the current state of the flow. Long story short, this flow takes the MessageState passed into it and sends it to the counterparty. While moving through the flow the ProgressTracker is updated to the relevant step. Further documentation on using a ProgressTracker can be found in the Corda docs.

Closing time

That was honestly a lot longer than I thought it would be and has taken me much longer to write than I hoped.

In conclusion, Spring WebFlux provides the capability to use reactive streams to handle response events whenever they arrive. When used with Corda, the progress of a flow can be tracked and a persistent stream of vault updates can be maintained ready to be acted upon as they arrive. To fully make use of WebFlux with Corda, we also had to look into ensuring that objects were serialised correctly by the server and then deserialised by the client so they can be made use of. Lucky Corda does provide some of this, but one or two classes or features are missing and we need to make sure that we use their provided the object mapper. Unfortunately, WebFlux requires a bit more configuration than I’m normally accustomed to when using Spring modules, but nothing that can’t be fixed.

The rest of the code for this post can be found on my GitHub

If you enjoyed this post, you can follow me on twitter at @LankyDanDev where I post updates of my new posts (although they have been slowing down a bit recently).



Responder flow validation

November 25, 2019
cordakotlindltdistributed ledger technologyblockchain

Do you want to prevent your node from accepting transactions that are not important to your business? Do you want to prevent your node from…

Common CorDapp mistakes and how to fix them

November 23, 2019
cordadltdistributed ledger technologyblockchainpresentationconference talk

This is the talk I gave at CordaCon in October. Title: A tale of Corda Slack adventures - Common CorDapp mistakes and how to fix them The…

Sometimes, it's better to not do your work

November 20, 2019
workculturehelpingteamwork

It’s 11 o’clock and I am finally doing my own work… I typically say this line to my colleague almost every day. I used to mean it (albeit…

Augmenting a Spring Data repository through delegation

September 14, 2019
springspring datakotlinjavar2dbcspring data r2dbcreactivereactive streamsspring boot

I have recently written several posts about Kotlin’s delegation. In doing so, I realised a useful way to apply it to Spring Data…

Implementing multiple interfaces through delegation

September 05, 2019
kotlin

In Kotlin, a class can implement multiple interfaces. This is common knowledge. A class can also use delegation to implement numerous…