Workflow using Temporal.io
Remember that in the KuFlow App you can generate a sample Worker template for your Workflow with an implementation in the language of your choice, avoiding having to implement this code manually.
If you want to integrate with KuFlow through the implementation of a Worker using the supported languages and our Temporal integration, in this section we present a quick guide on how to do it. If you have not read the Architecture section and the Authentication section, we recommend you to do it before continuing. In this document we will review the most important aspects.
Supported SDKs
The following languages are supported, and others are coming.
- Java
- TypeScript / JavaScript
- Python
Workers
In KuFlow, Temporal is used as the recommended technology for the implementation of Workers. A worker is nothing more than a service that implements your business logic. From the Temporal perspective, is composed of Workflows and Activities that model the business processes you want to implement. Therefore, in this guide we will use Temporal concepts, so you should familiarize yourself with them. The Temporal documentation is a good starting point.
Workflows
Workflows, as you can read in the Temporal documentation, is the code that defines the constraints of a Workflow execution, see Workflow Definition
- Java
- TypeScript / JavaScript
- Python
To implement a Workflow for KuFlow, we must create an annotated interface with @WorkflowInterface
and define an annotated method with @WorkflowMethod
.
The method signature annotated with @WorkflowMethod
is always the same in KuFlow. It contains a single parameter WorkflowRequest
and returns
a WorkflowResponse
object. The name used in the @WorkflowMethod
annotation will be used later in the process definition in the KuFlow application:
@WorkflowInterface
public interface SampleWorkflow {
@WorkflowMethod(name = "WorkerExampleType")
WorkflowResponse runWorkflow(WorkflowRequest request);
}
To implement a Workflow for KuFlow, we must create an async function. Then we will implement it as we would any Workflow implemented in Temporal.
The method signature is always the same in KuFlow. It contains a single parameter WorkflowRequest
and returns a WorkflowResponse
object.
The function name will be used later in the process definition in the KuFlow application.
export async function TemplateJsExample(
workflowRequest: WorkflowRequest
): Promise<WorkflowResponse> {}
To implement a Workflow for KuFlow, we must create a decorated class with @workflow.defn
and define a decorated method with @workflow.run
.
Then we will implement it as we would any Workflow implemented in Temporal. Remember that you can only have one method decorated with @workflow.run
per class.
The method signature is always the same in KuFlow. It contains a single parameter WorkflowRequest
and returns a WorkflowResponse
object.
The function name will be used later in the process definition in the KuFlow application.
@workflow.run
async def run(
self, request: models_temporal.WorkflowRequest
) -> models_temporal.WorkflowResponse:
"""Complete method with your code"""
pass
In the same way, it is necessary to define the contract for a Temporal Signal. This signal is emitted when a task is completed in KuFlow. The definition of this signal is useful for orchestrating tasks of an asynchronous nature, e.g. tasks of a human nature. Such tasks that are done by humans have an intrinsic asynchrony component since humans may perform the task at times other than the Workflow execution. With this signal, what we do is that we pause our Workflow when we want to wait for a particular task to be completed. Once the signal is received, our Workflow continues where we paused it.
- Java
- TypeScript / JavaScript
- Python
To define the signal, we add this method signature in the Workflow interface.
import com.kuflow.temporal.activity.kuflow.util.Constants;
import com.kuflow.temporal.workflow.kuflow.model.WorkflowRequest;
import com.kuflow.temporal.workflow.kuflow.model.WorkflowResponse;
import com.kuflow.temporal.workflow.kuflow.KuFlowEngineSignalProcessItem;
import com.kuflow.temporal.workflow.kuflow.model.SignalProcessItem;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
@WorkflowInterface
public interface SampleWorkflow {
@WorkflowMethod(name = "WorkerExampleType")
WorkflowResponse runWorkflow(WorkflowRequest request);
@SignalMethod(name = KuFlowEngineSignalProcessItem.SIGNAL_NAME)
void kuFlowEngineSignalProcessItem(SignalProcessItem signal);
}
A possible implementation of this method would be the following:
private Set<UUID> kuFlowCompletedTaskIds = new HashSet<>();
@Override
public void kuFlowEngineSignalProcessItem(SignalProcessItem signal) {
this.kuFlowCompletedTaskIds.add(signal.getId());
}
So that you can pause the workflow as follows:
Workflow.await(() -> this.kuFlowCompletedTaskIds.contains(task.getId()));
Then we will implement it as we would any Workflow implemented in Temporal.
To define the signal, we create a const like:
import {
KUFLOW_ENGINE_SIGNAL_PROCESS_ITEM,
type SignalProcessItem,
} from '@kuflow/kuflow-temporal-workflow-kuflow'
const kuFlowEngineSignalProcessItem = defineSignal<[SignalProcessItem]>(KUFLOW_ENGINE_SIGNAL_PROCESS_ITEM)
A possible implementation of this signal would be the following:
const kuFlowCompletedTaskIds: string[] = []
setHandler(kuFlowEngineSignalProcessItem, (signal: SignalProcessItem) => {
kuFlowCompletedTaskIds.push(signal.id)
}
So that you can pause the workflow as follows:
await condition(() => kuFlowCompletedTaskIds.includes(taskId))
Then we will implement it as we would any Workflow implemented in Temporal.
To define the signal, we add this method in the Workflow class.
with workflow.unsafe.imports_passed_through():
from kuflow_temporal_workflow_kuflow import models as models_workflow
...
def __init__(self) -> None:
self._kuflow_completed_task_ids: List[str] = []
@workflow.signal(name=models_workflow.KUFLOW_ENGINE_SIGNAL_PROCESS_ITEM)
async def kuflow_engine_completed_task(self, signal: models_workflow.SignalProcessItem) -> None:
self._kuflow_completed_task_ids.append(signal.id)
So that you can pause the workflow as follows:
await workflow.wait_condition(lambda: task.id in self._kuflow_completed_task_ids)
Then we will implement it as we would any Workflow implemented in Temporal.
The object WorkflowRequest
passed as a parameter in the main Workflow method contains the identifier of the process that has been started in KuFlow. This identifier will be our starting point.
The response object of the method contains a "message" field of optional use and whose purpose is merely informative for the developer. The structure of a Workflow is composed of different executions of Temporal activities orchestrated according to the business logic to be implemented.
The business block will mix activities that make up your business, with activity executions that have an associated behavior in KuFlow. For example,
if you want to create a Task in KuFlow, you will have an execution of the KuFlow_Engine_createTask
activity that creates a KuFlow Task that has
direct representation in the application. In the Examples section, you will find concrete examples of this.
- Java
- TypeScript / JavaScript
- Python
To facilitate the integration of KuFlow's own activities, the KuFlowAsyncActivities
and KuFlowSyncActivities
java interfaces has been implemented, which we will see later.
In the same way, the codes we provide use the Temporal Java SDK.
To facilitate the integration of KuFlow's own activities, the createKuFlowSyncActivities
and createKuFlowAsyncActivities
javascript object
has been implemented, which we will see later.
In the same way, the codes we provide use the Temporal JavaScript SDK.
To facilitate the integration of KuFlow's own activities, the KuFlowAsyncActivities
and KuFlowSyncActivities
python classes has been implemented, which we will see later.
In the same way, the codes we provide use the Temporal Python SDK.
Workflows recommendations
Temporal's general recommendations for the implementation of Workflows are fully valid and we advise following them to achieve a successful integration.
Timeouts
It is important that you properly set the timeouts you want to operate with Temporal, for this and for simplicity we recommend to always set two of them: StartToCloseTimeout and ScheduleToCloseTimeout.
-
StartToCloseTimeout
: is the maximum time allowed for a single Activity Task execution -
ScheduleToCloseTimeout
: is the maximum amount of time allowed for the execution of an activity until it reaches the closed state. It covers the whole set of retries that could have the execution of the activity.
The values they will contain will depend on the execution times of your activities.
Retries
In the real world, services are never available 100% of the time and are never fully reliable. Therefore an appropriate retry logic must be configured in the activity execution options. Normally it will be sufficient to include the default policy whenever you do not want a specific behavior.
Activities
To make things easier for the integrator, KuFlow has developed a series of Activities that simplify the management of the most common aspects that the integrator will have to deal with in the integration. These activities and other utilities that we make available to you are open source and the code can be obtained from our repositories.
- Java
- TypeScript / JavaScript
- Python
The implementation of these activities in turn contains third-party dependencies that must be used transitively. Of course, the use of this catalog of libraries that we make available to you is not mandatory. However, we recommend reading this article together with the exploration of the libraries in our code repositories.
Activities catalog
Our activities catalog is constantly updated, so it is recommended that you visit this documentation periodically.
- KuFlow Task Activities
Set of activities that interact with the KuFlow API Rest and that involve the most important operations that a Worker has to deal with. It includes operations to indicate the start and end of a Process, as well as the operations needed to interact with the Tasks.
- Email Activities
Activity that performs the sending of emails, configuring its own sending service. It uses Thymeleaf as template libraries for its correct use.
- S3 Activities
Currently, it provides a feature that allows copying documents from a KuFlow task to an S3 repository.
- UI.Vision Activities
Activity that allows the execution of a UI.Vision robot (RPA).
The implementation of these activities in turn contains third-party dependencies that must be used transitively. Of course, the use of this catalog of libraries that we make available to you is not mandatory. However, we recommend reading this article together with the exploration of the libraries in our code repositories.
Activities catalog
Our activities catalog is constantly updated, so it is recommended that you visit this documentation periodically.
- KuFlow Task Activities
Set of activities that interact with the KuFlow API Rest and that involve the most important operations that a Worker has to deal with. It includes operations to indicate the start and end of a Process, as well as the operations needed to interact with the Tasks.
The implementation of these activities in turn contains third-party dependencies that must be used transitively. Of course, the use of this catalog of libraries that we make available to you is not mandatory. However, we recommend reading this article together with the exploration of the libraries in our code repositories.
Activities catalog
Our activities catalog is constantly updated, so it is recommended that you visit this documentation periodically.
- KuFlow Task Activities
Set of activities that interact with the KuFlow API Rest and that involve the most important operations that a Worker has to deal with. It includes operations to indicate the start and end of a Process, as well as the operations needed to interact with the Tasks.
Connect the Worker to KuFlow
To connect the worker to our infrastructure you must follow the next steps:
Add KuFlow dependency to your project
In order to add KuFlow to your project you must do the following:
- Java
- TypeScript / JavaScript
- Python
Add KuFlow dependencies to your pom.xml:
<dependency>
<groupId>com.kuflow</groupId>
<artifactId>kuflow-temporal-activity-kuflow</artifactId>
<version>X.Y.Z</version>
</dependency>
<dependency>
<groupId>com.kuflow</groupId>
<artifactId>kuflow-temporal-workflow-kuflow</artifactId>
<version>X.Y.Z</version>
</dependency>
<dependency>
<groupId>com.kuflow</groupId>
<artifactId>kuflow-temporal-worker</artifactId>
<version>X.Y.Z</version>
</dependency>
or to build.gradle:
compile group: 'com.kuflow', name: 'kuflow-temporal-activity-kuflow', version: 'X.Y.Z'
compile group: 'com.kuflow', name: 'kuflow-temporal-workflow-kuflow', version: 'X.Y.Z'
compile group: 'com.kuflow', name: 'kuflow-temporal-worker', version: 'X.Y.Z'
Other dependencies ara available from Maven Central.
Add KuFlow dependencies to your package.json:
"dependencies": {
"@kuflow/kuflow-temporal-activity-kuflow": "X.Y.Z",
"@kuflow/kuflow-temporal-workflow-kuflow": "X.Y.Z",
"@kuflow/kuflow-temporal-worker": "X.Y.Z",
}
Other packages are available from Npm.
Add kuflow-temporal-activity-kuflow as a dependency to your pyproject or other dependencies mechanism:
kuflow-temporal-activity-kuflow = "^X.Y.Z"
kuflow-temporal-workflow-kuflow = "^X.Y.Z"
kuflow-temporal-worker = "^X.Y.Z"
Other packages are available from PyPI.
Create the Rest Client
As detailed in the documentation referred to Authentication, you need to get the Application
credentials that can be obtained through the KuFlow backoffice.
With these credentials you can have access to our REST API.
- Java
- TypeScript / JavaScript
- Python
To create a rest client you can follow the next code:
private static KuFlowRestClient kuFlowRestClient() {
KuFlowRestClientBuilder builder = new KuFlowRestClientBuilder()
.clientId("CLIENT_ID") // In our case this is the Application ID
.clientSecret("CLIENT_SECRET"); // In our case this is the Application token
return builder.buildClient();
}
But if you use Spring Boot, you're in luck. You could skip this step by simply using the auto-configuration we provide. For example, if you use Maven:
<dependency>
<groupId>com.kuflow</groupId>
<artifactId>kuflow-spring-boot-autoconfigure</artifactId>
</dependency>
To create a rest client you can follow the next code:
const kuFlowRestClient = new KuFlowRestClient(
{
clientId: 'CLIENT_ID', // In our case this is the Application ID
clientSecret: 'CLIENT_SECRET', // In our case this is the Application token
},
)
To create a rest client you can follow the next code:
# Rest client for the KuFlow API
kuflow_client = KuFlowRestClient(
client_id="CLIENT_ID",
client_secret="CLIENT_SECRET",
)
Connect to KuFlow Temporal
As previously mentioned, to have access to our temporal cluster you need to establish a Mutual TLS (mTLS) connection and present a JWT token in each request, along with other requirements that we explain later.
To simplify the use of the platform and facilitate integration, our SDK handles certificate and token management transparently to you. You simply have to configure the "KuFlow Temporal Connection" correctly:
- Java
- TypeScript / JavaScript
- Python
Create an instance of the KuFlowTemporalConnection class, as shown below:
@Bean
public KuFlowTemporalConnection kuFlowTemporalConnection() {
TemporalProperties temporalProperties = this.applicationProperties.getTemporal();
return KuFlowTemporalConnection
.instance(this.kuFlowRestClient)
.configureWorkflowServiceStubs(builder -> builder.setTarget(temporalProperties.getTarget()));
}
Initialize a worker, as shown below:
private void startWorkers() {
this.kuFlowTemporalConnection.configureWorker(builder ->
builder
.withTaskQueue(this.applicationProperties.getTemporal().getKuflowQueue())
.withWorkflowImplementationTypes(SampleWorkflowImpl.class)
.withActivitiesImplementations(this.kuFlowActivities)
.withActivitiesImplementations(this.sampleActivities)
);
this.kuFlowTemporalConnection.start();
}
Create an instance of the KuFlowTemporalConnection class and initialize a worker, as shown below:
const kuFlowTemporalConnection = await KuFlowTemporalConnection.instance({
kuflow: {
restClient: kuFlowRestClient,
},
temporalio: {
connection: {
address: workerProperties.temporal.target,
},
worker: {
taskQueue: 'WORKER_TASK_QUEUE', // Worker namespace to use
workflowsPath: require.resolve('./workflows'), // Register the workflow path needed
activities: {
...createKuFlowActivities(kuFlowRestClient),
...customActivities, // Register the activities needed
},
},
},
})
await kuFlowTemporalConnection.runWorker()
Create an instance of the KuFlowTemporalConnection class and initialize a worker, as shown below:
kuflow_temporal_connection = KuFlowTemporalConnection(
kuflow=KuFlowConfig(rest_client=kuflow_rest_client),
temporal=TemporalConfig(
client=TemporalClientConfig(
target_host=configuration.temporal_host,
),
worker=TemporalWorkerConfig(
task_queue=configuration.temporal_queue,
workflows=[SampleWorkflow],
activities=kuflow_activities.activities,
),
),
)
# Start temporal worker
await kuflow_temporal_connection.run_worker()
External Systems
In KuFlow we recommend that you integrate with our systems using Temporal Workers given the facilities it offers in terms of resilience of your business processes. However, you may want to use our REST API without having a Worker or as a complement to it. For example, let's imagine a scenario where you have an application that is responsible for initiating processes in KuFlow instead of using our web application.
In that case, your application would only need to make a request to our REST API to start the process and from there it would be a Worker that would be in charge of processing the Workflow. We call this application External system in KuFlow terminology. In this case, from the External system point of view, it is only necessary to configure the security to authenticate in the REST API.
- Java
- TypeScript / JavaScript
- Python
To facilitate the integration, you can use the kuflow-rest
module which contains client interfaces using AutoRest technology with which you can connect to our API.
private static KuFlowRestClient kuFlowRestClient() {
return new KuFlowRestClientBuilder();
.clientId("CLIENT_ID") // In our case this is the Application ID
.clientSecret("CLIENT_SECRET") // In our case this is the Application token
.buildClient();
}
To facilitate the integration, you can use the @kuflow/kuflow-rest
module which contains client interfaces using AutoRest technology with which you can connect to our API.
const kuFlowRestClient = new KuFlowRestClient(
{
clientId: 'CLIENT_ID', // In our case this is the Application ID
clientSecret: 'CLIENT_SECRET', // In our case this is the Application token
}
)
To facilitate the integration, you can use the kuflow-rest
package (available in Pypi and Github) which contains client interfaces using AutoRest technology with which you can connect to our API.
kuflow_client = KuFlowRestClient(
client_id="CLIENT_ID",
client_secret="CLIENT_SECRET",
)
In our repository you can examine the complete implementation of the module as well as examples of its use.
Other languages
KuFlow is a platform agnostic of the programming language of customers, so you can use your favorite programming language. KuFlow provides SDKs, examples and tutorials that can help you in your integration. If there is no documentation is available for your language yet, we recommend that you take a look at any other integration guide as the basic concepts will be similar.
If you want to integrate into KuFlow using Temporal.io, you should use one of the SDKs provided by KuFlow because they greatly facilitate the work of configuring the different authentication mechanisms. In addition, it provides features that are only available under these KuFlow SDKs and will always include the Temporal SDKs.
However, if you do not want to use Temporal or there is simply no support for your integration language, you can opt for the Rest/WebHooks integration provided by KuFlow. Since our WebHooks guarantee delivery and order, orchestrating a flow of calls to our Rest API, you can implement any business process.
Ask us any question in our forums or Discord, we are happy to help you.