Integration using MuleSoft Transformers

Nov. 24, 2016


One of CloudStorm's core competencies is enterprise-level application integration. There is an increasing need for organizations to effectively transform and exchange data between internal and partner systems. There are various tools and technologies available to design and implement integration points. For example, the Django project provides an excellent and robust framework to design, build, and deploy web applications. Django also provides applications such as rest-framework to quickly design and implement REST APIs.

There are also a number of companies that specialize in the application integration space such as TIBCO, MuleSoft, Dell Boomi, and more. The advantage of tools provided by these companies is much faster implementation because much of the complexity is abstracted from the user. I am a huge fan of the Django framework, but it requires significantly more technical expertise to do the same thing versus using a toolset such as MuleSoft. The obvious disadvantage of integration tools is cost; frameworks such as Django are free while integration tools are fairly expensive.

A major requirement of most integrations is the need to transform data when it is moved from system to system. Integrations themselves are often chained before the intended target system is eventually reached. For example, a sending systemmay initiate a transaction by invoking a SOAP service with an XML payload which will in turn be received by a REST API or a JMS queue. That API, in turn, may transform the data and pass it on to another API in a completely different data format. The topic of this blog is to highlight MuleSoft's DataWeave engine and compare it to MuleSoft's Python transformer. We will use the exact same payload, transform the data, and then invoke a REST endpoint. The focus will be on how the data is transformed by each transformer. A basic working knowledge of MuleSoft is assumed as a tutorial on MuleSoft is well beyond the scope of this blog post. One of the most fundamental concepts within MuleSoft is the concept of the "payload" which contains a Java object and is the core of the Mule message as it travels through various Mule components. This blog only focuses on one specific data transformation. The first way will use the Python transformer and the second way will use the DataWeave transformer. We could also do the same thing using another language such as Java or Groovy. I recommend learning DataWeave which is a powerful JSON-like language designed specifically for writing complex data transformations efficiently and in a common template language which support long term maintenance.

The code sections below are actual code samples embedded within a Python data transformer and DataWeave transformer (separate flows) respectively. The purpose of the transformation is to accept a transportation shipment consisting of multiple legs (i.e. start and end points) and convert the different legs into unique jobs for an asset tracking system where a "job" consists on one start point and one destination. Assume we have a shipment that consists of a movement from a carrier yard to a pickup location and then to a destination. This one shipment equates to two distinct jobs; the first from the carrier yard to the pickup locations, and the second from the pickup location to the delivery locations.

The shipment is represented in JavaScript Object Notation (JSON) format below:

 {
    "SHIPMENTS" : {
        "SHIPMENTHEADER" : {
            "SHIPMENT_NUMBER" : "6107",
            "DOMAINNAME" : "My Domain",
            "MATERIAL" : "METAL",
            "ASSET_GUID" : "558D5AB58B2B0B342B148ADD",
            "TRANS_TYPE" : "Finished Goods",
            "ORIGINAL_SOURCE_LOCATION" : "Reno",
            "FINAL_DESTINATION_LOCATION" : "San Francisco"
        },
        "SHIPMENTDETAILS" : {
        "STOPS" : [ {
            "STOP_SEQUENCE_NUMBER" : "1",
            "STOP_LOCATION_ID" : "Reno",
             "STOP_LEVEL_PLANNED_ARRIVAL_DATETIME" : "20160708103830",
             "STOP_LEVEL_PLANNED_DEPARTURE_DATETIME" : "20160708103830"
        }, {
            "STOP_SEQUENCE_NUMBER" : "2",
            "STOP_LOCATION_ID" : "Sacramento",
             "STOP_LEVEL_PLANNED_ARRIVAL_DATETIME" : "20160708130000",
              "STOP_LEVEL_PLANNED_DEPARTURE_DATETIME" : "20160708130000"
        }, {
            "STOP_SEQUENCE_NUMBER" : "3",
            "STOP_LOCATION_ID" : "San Francisco",
            "STOP_LEVEL_PLANNED_ARRIVAL_DATETIME" : "20160708150437",
            "STOP_LEVEL_PLANNED_DEPARTURE_DATETIME" : "20160708160437"
        } ],
        "LOCATION" : [ {
            "STOP_NAME" : "Reno",
            "STOP_GUID" : "571122A0A69348E557918DC1"
            }, {
                "STOP_NAME" : "Sacramento",
                "STOP_GUID" : "56C7218004E2F4E14A208756"
            }, {
                "STOP_NAME" : "San Francisco",
                "STOP_GUID" : "56F1D0C0DB742DFE1CCD4A58"
                } ]
        }
    }
 }

Here is the code needed to perform the transformation in Python:

import json
from datetime import datetime, timedelta

d = json.loads(payload)

header = d["SHIPMENTS"]["SHIPMENTHEADER"]
details = d["SHIPMENTS"]["SHIPMENTDETAILS"]

asset = d["SHIPMENTS"]["SHIPMENTHEADER"]["ASSET_GUID"]
shipNo = d["SHIPMENTS"]["SHIPMENTHEADER"]["SHIPMENT_NUMBER"]
desc = d["SHIPMENTS"]["SHIPMENTHEADER"]["TRANS_TYPE"]
transType = d["SHIPMENTS"]["SHIPMENTHEADER"]["TRANS_TYPE"] + ": " 
                      + d["SHIPMENTS"]["SHIPMENTHEADER"]["MATERIAL"]
lob = "Finished Goodsr"
sid = "562a3595ee5fa18422e4092a"

stops = details["STOPS"]
locations = details["LOCATION"]

# Each job will be a dictionary keyed by the job id we assign
jobs = {}
job_list = []

def get_stop_guid(location_name):
    for location in locations:
        if location["STOP_NAME"] == location_name:
            return location["STOP_GUID"]

for i in range(len(stops)):
    if i == (len(stops)-1):
        pass
    else:
        # The job id will be the shipment number with "00" and the stop sequence number appended
        job = shipNo + "00" + stops[i]["STOP_SEQUENCE_NUMBER"]
        stime = stops[i]["STOP_LEVEL_PLANNED_DEPARTURE_DATETIME"]
        startTime = datetime.strftime(datetime.strptime(stime, "%Y%m%d%H%M%S")  +
                              timedelta(hours=0),  "%Y-%m-%dT%H:%M:%S" + "Z")
        etime = stops[i+1]["STOP_LEVEL_PLANNED_ARRIVAL_DATETIME"]
        stopTime = datetime.strftime(datetime.strptime(etime, "%Y%m%d%H%M%S")  + 
                             timedelta(hours=0),  "%Y-%m-%dT%H:%M:%S" + "Z")
        startLocation = stops[i]["STOP_LOCATION_ID"]
        stopLocation = stops[i+1]["STOP_LOCATION_ID"]
        start_stop_location = get_stop_guid(startLocation)
        end_stop_location = get_stop_guid(stopLocation)
        status = "OPEN"
        cust_fields = {}
        cust_fields["type"] = header["TRANS_TYPE"]
        cust_fields["assetCompany"] = header["DOMAINNAME"]
        cust_fields["otmJobStartLocation"] = header["ORIGINAL_SOURCE_LOCATION"]
        cust_fields["otmJobEndLocation"] = header["FINAL_DESTINATION_LOCATION"]
        cust_fields["startStopNo"] = stops[i]["STOP_SEQUENCE_NUMBER"]
        cust_fields["endStopNo"] = stops[i+1]["STOP_SEQUENCE_NUMBER"]

        j = { "name" : job, "description" : desc, "assetId" : asset, 
                "plannedStartTime" : startTime, "plannedEndTime" : stopTime, 
                "startWaypointId" : start_stop_location, 
                "endWaypointId" : end_stop_location, "status" : "OPEN", 
                "customerFields" : cust_fields }

        job_list.append(j)

jobs["shipments"] = job_list

result = json.dumps(jobs)

The important concept to keep in mind is the transformation of the Mule payload when it gets processed by the Python transformer. Note that the first action following the library imports is to load the payload into a Python dictionary: d = json.loads(payload). We were able to load the payload because a previous XSLT transformation had output the payload into a JSON format. Finally, at the end we pass on the payload to the next Mule component by returning the new payload as a JSON string: result = json.dumps(jobs).

The following transformation using MuleSoft's DataWeave component achieves the exact same result:

%dw 1.0 
%output application/json 
%type local = :localdatetime  {format: "yyyyMMddHHmmss"}

%var header = payload.SHIPMENTS.SHIPMENTHEADER
%var shipmentId = header.SHIPMENT_NUMBER

%var desc = header.TRANS_TYPE

%var sid = p('sid')
%var stops = payload.SHIPMENTS.SHIPMENTDETAILS.STOPS as :array
%var asset = header.ASSET_GUID
%var domainName = header.DOMAINNAME

%function getId(sequenceNumber) shipmentId ++ "00" ++ sequenceNumber

%function formatDate(incomingDate) (
    (incomingDate) as :local as :datetime
)

%var locArray  = payload.SHIPMENTS.SHIPMENTDETAILS.LOCATION map ({
    name: $.STOP_NAME,
        guid : $.STOP_GUID
} )
---
{
shipments :
    (stops map using ( 
        index = $$ + 1,
        id = getId($.STOP_SEQUENCE_NUMBER), 
        name = $.STOP_LOCATION_ID,
        nextName = stops[$$+1]['STOP_LOCATION_ID'])
    ( {
        name: id,
        description: desc,
        assetId: asset,
        plannedStartTime : formatDate( $.STOP_LEVEL_PLANNED_DEPARTURE_DATETIME),
        plannedEndTime : formatDate( stops[index]['STOP_LEVEL_PLANNED_ARRIVAL_DATETIME']),
        startWaypointId : ((locArray filter $.name == name )).guid[0],
        endWaypointId : ((locArray filter $.name == nextName )).guid[0],
        status : 'OPEN',
        customerFields : {
            type : header.TRANS_TYPE,
            assetCompany : domainName,
            otmJobStartLocation : $.STOP_LOCATION_ID,
            otmJobEndLocation : stops[index]['STOP_LOCATION_ID'],
            startStopNo: $.STOP_SEQUENCE_NUMBER,
            endStopNo: stops[index]['STOP_SEQUENCE_NUMBER']
            }
        } when (sizeOf stops)-1 > $$ otherwise null ) filter $ != null
    )
}

The Dataweave expression language admittedly looks fairly cryptic, but my experience is that it is far easier to train programmers to quickly become proficient with Dataweave versus learning several different languages. MuleSoft also provides a visual data mapper, but it is primarily useful for mappings that are fairly straightforward (i.e. not very complex). Below are some imporant takeaways from the Dataweave transformation:

o Payload Output: Near the front we declared the output would be %output application/json, but we could just as easily have specified java or some other format. o Delimiter between Dataweave variables/functions and the actual transformation: Note the --- in the Dataweave code. The three dashes look innocous, but they are very important because they inform the Dataweave engine that everything that follows is part of the transformation. o The $ and $$ symbols have special significance within Dataweave. The $ refers to the current object and the $$ refers to the object's index within the array.

The purpose of this post was to highlight the capabilities MuleSoft offers to transform data using custom transformations built in a programming language (Python in this case) and MuleSoft's Dataweave template engine. There are also other important concepts highlighted as well. For example, JSON is a leading standard of content-types available on the web; another well known standard is eXtensible Markup Language (XML). Competent and effective "full stack" developers should be familiar and knowledgeable with multiple languages (e.g. Python, Java, JavaScript, etc.), frameworks (e.g. Django, Angular, etc.), protocols (e.g. HTTP/S, FTP, SMTP, etc.). It is important for any integration professional that uses MuleSoft's tools to add Dataweave to that list.

Comment Enter a new comment: