Paul Edwards

Mar 1, 2021

Getting started with API based data ingestion

Developers frequently want to be able to programmatically submit data into the Bloomreach Intelligent Index. This post explores communicating with the Bloomreach Data Ingestion API’s using a Node.js sample application. It is worth noting that more traditional tab and comma separated data formats are still supported.

Getting started with API based Data Ingestion

Vidyard tl;dr, you can watch this in video format here

GitHub Logos and Usage · GitHub tl;dr, repo here

High Level Overview

Data is ingested into the Bloomreach Intelligent Index in a two phase approach where each phase is known as a ‘job’:

► ingesting the data

► updating the index

These jobs are decoupled as there could be different departments asking the platform to ingest data simultaneously.

Upon submitting a job, the platform will respond back with a jobId and you can use this to check the status of the job.

Data is sent to the API endpoint as a request where the body is a JSON ‘Patch’. The API endpoint supports two HTTP verbs: ‘PUT’ and ‘PATCH’.  Doing ‘PUT’ will replace the index entirely whilst ‘PATCH’ will modify what is there (use with caution!).

The JSON patch requests will have an operation which is either ‘add’ or ‘remove’ (Note: This add / remove could apply at the product, variants, or view level).

Sample Application Workflow

The sample application will follow a simple workflow:

Loading Data

As the Bloomreach API expects a JSON Patch, a great place to start is just to store these as JSON files on disc:

[
 {
   "op": "add",
   "path": "/products/pid-123",
   "value": {
     "attributes": {
       "url": "https://wonderfulengineering.com/wp-content/uploads/2013/11/self-stirring-mug.jpg",
       "price": 10,
       "title": "Self stirring mug",
       "thumb_image": "https://wonderfulengineering.com/wp-content/uploads/2013/11/self-stirring-mug.jpg",
       "description": "The most amazing product that you never knew you needed...",
       "tags": ["", ""],
       "language": "en"
     }
   }
 }
]

As a side note, JSON Lines was picked as it lends itself to large data volume applications where it makes sense to dump each line out to a file rather than trying to hold the whole JSON object in memory before dumping it. It also lends itself to being split across files much easier. More on this in a future article.

Our Load Data Function

This function takes the third parameter from the command which launched the app and uses it as the path to the data we want to load. In this way, we make a nice, flexible piece of code which can be used for many experiments without having to modify it:

loadData = () => {
 return new promise(function (resolve, reject) {
   try {
     let processArguments = process.argv
     let targetFile = processArguments[2]
     logger.log("loading data from: " + targetFile)
     let patchData = fs.readFileSync(targetFile)
     resolve(patchData)
   } catch (err) {
     reject("Data load error. This could be because the path to your patch file is invalid: " + err)
   }
 })
}

Note: You’ll notice it calls to logger, which is a simple logging module I wrote so we can see what’s going on during data ingestion.

Submitting a Patch Data Request

Our function which submits a patch data request is relatively simple. It accepts the patch data as an argument and makes an HTTPS call to the data ingestion API using the ‘PATCH’ method. It waits for the response to be received completely before resolving itself.

   let options = {
       hostname: "api-staging.connect.bloomreach.com",
       method: "PATCH",
       path:
         "/dataconnect/api/v1/accounts/" +
         settings.BRSM_ACCOUNT_ID +
         "/catalogs/" +
         settings.BRSM_CATALOG_NAME +
         "/products",
       port: 443,
       headers: {
         "Content-Type": "application/json-patch+json",
         Authorization: settings.BEARER_API_KEY,
       },
     }

► path is /dataconnect/api/v1/accounts/<your account id>/catalogs/<your catalog name>/products - which simply indicates the account, ,catalog and the fact we are going to put data into products. 

► The method used here is ‘PATCH’ - we aren’t going to replace the whole index - simply update whatever is there. 

► In the Headers, you will see that the content type is application/json-patch+json - which indicates that we are sending the patch file payload directly in this API request.

► Also in the headers you will see Authorization: <your bearer Api key> which will be issued to you during implementation.

The rest of the code block simply makes the request and handles the response coming back:

  req = https
       .request(options, (resp) => {
         let data = ""
         // A chunk of data has been received.
         resp.on("data", (chunk) => {
           data += chunk
         })
 
         // The whole response has been received.
         resp.on("end", () => {
           resolve(JSON.parse(data))
         })
       })
       .on("error", (err) => {
         reject(err.message)
       })
 
     req.write(patchData)
     req.end()

Requesting the Index to Update

Requesting the index to update is done via a POST request to the /indexes endpoint:

     let options = {
       hostname: "api-staging.connect.bloomreach.com",
       method: "POST",
       path:
         "/dataconnect/api/v1/accounts/" +
         settings.BRSM_ACCOUNT_ID +
         "/catalogs/" +
         settings.BRSM_CATALOG_NAME +
         "/indexes",
       port: 443,
       headers: {
         Authorization: settings.BEARER_API_KEY
       },
     }

► Path is to /indexes rather than /products

► Method is ‘POST’

The rest of the function is essentially a repeat of the above so I have omitted it for the sake of brevity.

Get Job Status

Our function to get the status of a job simply accepts a jobId and makes a ‘GET’ request to the /dataconnect/api/v1/jobs/<jobId> endpoint and returns the response:

getJobStatus = (jobId) => {
 return new promise(function (resolve, reject) {
   let options = {
     hostname: "api-staging.connect.bloomreach.com",
     method: "GET",
     path: "/dataconnect/api/v1/jobs/" + jobId,
     port: 443,
     headers: {
       Authorization: settings.BEARER_API_KEY,
     },
   }
 
   req = https
     .request(options, (resp) => {
       let data = ""
       // A chunk of data has been recieved.
       resp.on("data", (chunk) => {
         data += chunk
       })
 
       // The whole response has been received.
       resp.on("end", () => {
         resolve(JSON.parse(data))
       })
     })
     .on("error", (err) => {
       reject(err.message)
     })
   req.end()
 })
}

Get Job Status Until We Have an Outcome

As jobs can go through a number of statuses (creating, queued, running, success, failed, skipped, killed), our code needs to be able to react to them before proceeding. Typically, you will see running, success or, if you have poorly formed JSON, failed.

In the sample app, I wrote an iterator which recursively calls the status endpoint until we reach a conclusion:

checkJobStatusUntilComplete = (jobId) => {
 return new promise(function (resolve, reject) {
   checkJobStatusUntilCompleteIterator(
     jobId,
     function () {
       resolve()
     },
     function (err) {
       reject(err)
     }
   )
 })
}
 
checkJobStatusUntilCompleteIterator = (jobId, callback, errCB) => {
 getJobStatus(jobId)
   .then((statusMessage) => {
     if (statusMessage.status == "failed") {
       errCB(statusMessage)
     } else if (statusMessage.status == "success") {
       logger.log("\u2705 success")
       callback()
     } else {
       logger.log("status: " + statusMessage.status, {sameLineLogging: true})
       setTimeout(function () {
         checkJobStatusUntilCompleteIterator(jobId, callback, errCB)
       }, 10000)
     }
   })
   .catch((err) => {
     reject(err.message)
   })
}

Note: You’ll notice I use a callback pattern for the iterator with a success and failure callback which just helps me to wrap the iterator into a promise whilst still maintaining the readability of the code. It’s a stylistic choice rather than anything important. I hope it doesn’t offend.

Building the Promise Chain

As I’ve built all of the functions out as promises, it’s easy to chain them together and to handle any errors:

submitJobAndMonitorStatus = () => {
 return new promise(function (resolve, reject) {
   logger.log("Running patching process.")
   loadData()
     .then((patchData) => {
       log("Patch data loaded.")
       return submitPatchDataRequest(patchData)
     })
     .then((response) => {
       logger.log("Patch job submitted with id: " + response.jobId)
       return checkJobStatusUntilComplete(response.jobId)
     })
     .then(() => {
       logger.log("About to update the index")
       return requestIndexUpdate()
     })
     .then((response) => {
       logger.log("Index update job submitted with id: " + response.jobId)
       return checkJobStatusUntilComplete(response.jobId)
     })
     .then(() => {
       resolve()
     })
     .catch((err) => {
       reject(err)
     })
 })
}

And to launch the whole process:

logger.log("", { noTime: true })
submitJobAndMonitorStatus()
 .then(() => {
   logger.log("\u2705 all done ")
 })
 .catch((err) => {
   logger.log("\u274c error submitting job: " + err + "\n")
 })

Summary and Closing Thoughts

The demo application above is good for payloads up to 5MB in size. After this, the platform asks that you submit your patch request data via SFTP and then use the API endpoints above to ask the platform to ingest and rebuild the index. The SLA for these APIs are — up to 1 Ingestion API request per minute and up to 1 index update request per hour.

As stated in the opening paragraph, you can find a video of this code in action here and the code-base itself is available here to help you iterate in your own projects. It took approximately two hours for me to go from an empty IDE to something which was indexing data in the Bloomreach Intelligent Index. With this fast-start, you can do it more quickly.

As always, feel free to reach out to me with feedback or comments — either via linkedin or via my email: [email protected]