Skip to content

Workflow Engine 활용사례

들어가기 전에

루니버스는 마이크로 서비스로 구성되어 있으며 여러 서비스에 걸쳐 실행되는 프로세스에 대해서 conductor를 도입해서 사용하고 있다. 이 글에서는 conductor가 무엇이고 어느 경우에 적용하면 좋은지 알아보겠다. 마지막으로, 루니버스에 적용된 사례에 대해서 살펴보겠다.

Conductor란?

Netflix에서 개발하고 오픈소스로 제공하는 conductor는 클라우드에서 실행되는 Workflow Orchestration Engine으로 다음 기능을 제공한다.

  • 프로세스를 일시 중지, 재개 및 다시 시작하는 기능
  • workflow 추적 및 관리
  • 프로세스 흐름을 시각화하기 위한 사용자 인터페이스
  • HTTP, gRPC와 같은 여러 프로토콜 전송 기능

장단점

장점

  • 프로세스 흐름을 시각화하기 위한 사용자 인터페이스 제공
  • workflow 추적 및 관리에 용이
  • 별도의 UI에서 프로세스를 중지, 재개, 재시작과 같은 편의 기능을 제공

단점

  • 사용하기까지 일정량의 학습시간이 필요 (learning curve)
  • 관리 포인트 증가 (conductor server)

언제 Conductor를 도입하면 좋을까?

다음과 같은 상황에서 conductor 사용을 고려해볼 수 있다.

  1. 여러 마이크로 서비스에 걸쳐 작업 수행이 필요한 경우
  2. 마이크로 서비스의 처리 결과를 다른 마이크로 서비스에서 사용하는 경우
  3. 복잡한 프로세스에 대한 추적(모니터링)이 필요할 때

콘셉트

Conductor는 비즈니스 프로세스를 실행하기 위해서 사전에 전체 작업 순서가 정의된 workflow와 개별 작업이 정의된 task를 conductor 서버에 REST API 방식으로 등록해서 사용한다.

task는 하나의 마이크로 서비스가 처리하는 작업 단위로, 처리할 작업을 사용자가 마이크로 서비스(Micro Service)에 구현해야 한다. (각 마이크로 서비스는 단일 기능에 대한 책임을 갖는다)

Workflow

JSON 형식으로 정의된 비즈니스의 전체 프로세스에 대한 메타데이터(Meta Data)로 개별 프로세스의 실행 순서가 정의된 tasks와 name, description과 같은 필드로 구성되어 있다.

예시
{
  "name": "name",
  "description": "description",
  "version": 1,
  "tasks": [
    {
      "name": "name",
      ...
    }
  ],
  "outputParameters": {
    "param1": "${workflow.output.param1}"
  },
  "failureWorkflow": "failureWorkflow",
  "restartable": true,
  "workflowStatusListenerEnabled": true,
  "schemaVersion": 2,
  "ownerEmail": "email"
}

필드 설명

Task

JSON 형식으로 정의된 개별 프로세스에 대한 메타데이터로 여러 옵션을 정의할 수 있다.

예시
{
  "name": "name",
  "retryCount": 3,

  "timeoutSeconds": 1200,
  "pollTimeoutSeconds": 3600,
  "inputKeys": [
    "key1",
    "key2"
  ],
  "outputKeys": [
    "key1",
    "key2"
  ],
  "timeoutPolicy": "TIME_OUT_WF",
  "retryLogic": "FIXED",
  "retryDelaySeconds": 600,
  "responseTimeoutSeconds": 3600,
  "concurrentExecLimit": 100,
  "rateLimitFrequencyInSeconds": 60,
  "rateLimitPerFrequency": 50,
  "ownerEmail": "email"
}

필드설명

동작방식

  1. 사용자는 미리 workflow와 task 메타데이터를 정의해서 conductor server에 등록한다.
  2. worker는 task가 구현되어 있는 마이크로 서비스로 주기적으로 conductor server 호출을 통해 task queues를 polling 하면서 자신이 처리해야 할 task가 schedule이 등록되어 있는지 확인한다.
  3. 사용자가 workflow를 실행하면 conductor server는 workflow를 구성하는 task에 대한 schedule을 task queues에 등록한다.
  4. schedule이 등록되면 worker는 task 정보를 가져온 후 conductor server에게 ack 메시지를 전송한다. (polling 후 ack 전송에 실패하면 conductor는 해당 task를 다시 queue에 schedule 시킨다.)
  5. worker가 task 수행을 완료하면 실행 결과를 conductor server에 전송하고 conductor server는 해당 task의 상태를 update 한다. 지정된 시간 동안(timeout) 작업 완료 메시지를 받지 못하면 conductor는 workflow 또는 task에 정의된 규칙에 따라 실패 처리되거나 재시도한다.
출처: https://netflixtechblog.com/netflix-conductor-a-microservices-orchestrator-2e8d4771bf40

적용 사례 (루니버스)

루니버스에서는 conductor를 통해 여러 프로세스를 처리하고 있고, 그중 Token 생성 프로세스에서 conductor를 도입한 사례에 대해서 알아보겠다.

conductor 공통 라이브러

마이크로 서비스에서 conductor 기능을 사용하기 위해, workflow 시작, task polling, task 처리와 같은 작업들을 수행하는 공통 라이브러리를 구현하고, workflow를 사용하는 마이크로 서비스 dependency에 추가해서 사용하고 있다.

Task 처리 로직 구현

처리할 task를 마이크로 서비스에 구현한다.

export class CreateSideTokenIdTask extends AbstractTaskWorker {
  protected readonly taskType: string = "CreateSideTokenId";

  constructor(conductorEndpoint: string) {
    super({
      apiEndpoint: conductorEndpoint,
      workerid: os.hostname(),
    });
  }

  async handle(input) {
		// polling 후 수행할 내용
    ...생략
  }
}

Task 등록

task를 구현한 마이크로 서비스 bootstraping 시 task polling 및 처리에 대한 함수를 호출한다.

...
private initCreateSideTokenIdTask(conductorEndpoint: string) {
  this.createSideTokenIdTask = new CreateSideTokenIdTask(conductorEndpoint);
  this.createSideTokenIdTask.start();
}

Task 정의

구현한 task 처리 로직에 대한 메타데이터를 정의한다.

[
	{ 
		"name": "name",
		"email": "email",
		"retryCount": 3,
		"retryLogic": "FIXED",
		"retryDelaySeconds": 10,
		"timeoutSeconds": 120,
		"timeoutPolicy": "TIME_OUT_WF",
		"responseTimeoutSeconds": 30
	}
]

Workflow 정의

Side Token 생성에 대한 workflow를 정의한다.

{
  "ownerEmail": "email",
  "name": "name",
  "description": "description",
  "version": 1,
  "tasks": [
    {
      "name": "ChangeTokenStatus",
      "taskReferenceName": "ChangeTokenStatusDeploying",
      "description": "change token status to deploying ",
      "type": "SIMPLE",
      "inputParameters": {
        "id": "${workflow.input.id}",
        "status": "DEPLOYING"
      }
    },
	  ... 생략
  ],
  "restartable": true,
  "workflowStatusListenerEnabled": true,
  "failureWorkflow": "DeployTokenFailedFlow",
  "schemaVersion": 2
}

input 파라미터 값의 존재여부에 따라서 실행할 task를 분리하기 위해 conductor 에서 제공하는 system task를 사용했다.

{
  "name": "decision",
  "taskReferenceName": "decisionRef",
  "type": "DECISION",
  "inputParameters": {
    "address": "${workflow.input.address}"
  },
  "caseExpression": "$.address ? 'SKIP' : 'DEPLOY'",
  "decisionCases": {
    "SKIP": [
      ...
    ],
    "DEPLOY": [
			...
		]
	}
}
Decision

조건에 따라서 프로세스를 분기해 주는 task

caseValueParam : 비교할 문자열

decisionCases : 분기 처리 프로세스를 정의

defaultCase : 매칭되는 값이 없을 경우 실행시킬 프로세스를 정의

caseExpression : 자바스크립트 표현식으로 생성한 문자열, caseValueParam 보다 다양한 경우를 커버할 수 있다.

workflow 실행 결과값에 따라 프로세스가 분기되는 모습을 확인할 수 있다.

DEPLOY 프로세스
SKIP 프로세스

Wiring Inputs and outputs

conductor는 workflow 실행 시 전달하는 input과 수행된 task의 output에 대해서 ${SOURCE.input/output.JSONPath}의 패턴으로 데이터를 매핑한다. 해당 패턴을 사용하면 task의 output을 다른 task의 input으로 사용할 수 있다.

SOURCE : workflow 또는 taskReferenceName

input/output : source의 input 또는 output

JSONPath : source의 input 또는 output에 대한 JSON path 표현식

failureWorkflow 정의

여러 task 중 하나라도 에러가 발생하면 token의 최종 상태는 FAILED가 되어야 한다. 이러한 보상 트랜잭션을 처리하기 위해 각 task는 에러가 발생하면 뱉도록 구현하고, workflow 상태가 fail이 되었을 때, token의 상태를 FAILED로 업데이트하는 failureWorkflow를 등록했다.

workflow 실패 시 failureWorkflow가 실행되는 것을 확인할 수 있다.

Workflow 실패

FailureWorkflow 실행

Workflow 실행

workflow를 실행할 마이크로 서비스에서 workflow typeinputParam을 파라미터로 호출한다.
...생략
public async startWorkFlow(type: WorkFlowType, inputParam: BaseObject) {
  let workFlowId: string = await this.workflowManager.startWorkflow({
    name: type,
    input: inputParam,
  });

  return workFlowId;
}

마무리

지금까지 conductor의 기능과 구현 방법에 대해서 알아보았다.

conductor는 여러 마이크로 서비스에 걸쳐 실행되는 프로세스를 개발할 때 conductor server에 실행 책임을 넘겨 서비스 간 결합도를 줄이고, 별도의 UI를 통해 실행 이력, 추적 등 다양한 편의 기능을 제공한다는 점에서 많은 장점을 가져다준다고 생각한다.

다만, 초기 도입 시 일정량의 학습시간이 필요하고, conductor server에 대한 관리 포인트가 증가한다는 점에서 현재 개발 환경과 비즈니스 workflow를 고려해서 도입을 검토하면 좋겠다는 생각으로 글을 마친다.

참고

Netflix Conductor: A microservices orchestrator

Share your blockchain-related digital insights with your friends

Facebook
Twitter
LinkedIn

Get more insights

NFTfi는 무엇일까?(ERC 4907)

Danksharding is an improved version of Ethereum’s sharding technology, which is one of the techniques that greatly increase transaction capacity and reduce gas fees in Ethreum 2.0 upgrade. To help you understand what Danksharding is, let’s first take a look at Ethereum's scalability strategy, which aims to increase network performance and ensure scalability.

루니버스 NOVA 업데이트 – Web3 개발자를 위한 멀티체인 API 오픈!

Danksharding is an improved version of Ethereum’s sharding technology, which is one of the techniques that greatly increase transaction capacity and reduce gas fees in Ethreum 2.0 upgrade. To help you understand what Danksharding is, let’s first take a look at Ethereum's scalability strategy, which aims to increase network performance and ensure scalability.

이더리움 확장성 솔루션, 댕크샤딩 (Danksharding)은 무엇일까? #1

댕크샤딩(Danksharding)은 이더리움 네트워크의 샤딩 기술을 개선한 것으로, 이더리움 2.0 업그레이드에서 채택된 기술 중 하나입니다. 댕크샤딩은 이더리움의 트랜잭션 처리량을 크게 향상시키고, 수수료를 낮추는 데 중점을 두고 있습니다.

람다256, 체인링크와 채널 파트너십 체결.. 웹3 생태계 확장

람다256(대표 박재현)이 글로벌 블록체인 네트워크 체인링크를 운영하는 체인링크 랩스(Chainlink Labs)와 채널 파트너십을 체결했습니다.  람다256은 체인링크의 탈중앙화 오라클 네트워크 솔루션을 통해 자사 블록체인 서비스의 기술력을 강화할 예정입니다. 체인링크는 분산금융(DeFi), 보험, 게임,