AI 에이전트의 멀티 태스킹 스케줄링과 우선순위 관리: 동시성 제어와 작업 최적화의 완벽 가이드
목차
- 머리말: 복잡한 작업 환경의 도전
- 멀티 태스킹 아키텍처의 기본
- 우선순위 큐(Priority Queue) 구현
- 스케줄링 알고리즘 심층 분석
- 리소스 경합(Resource Contention) 해결
- 실시간 작업 모니터링 및 조정
- 프로덕션 배포 및 튜닝
- 결론 및 실무 권장사항
- Tags
1. 머리말: 복잡한 작업 환경의 도전
Modern AI agents are increasingly tasked with managing multiple concurrent operations, from customer service inquiries to data processing pipelines. The challenge isn’t just handling multiple tasks—it’s handling them intelligently, with awareness of their priority, resource constraints, and deadline pressures.
현대의 AI 에이전트는 고객 서비스 문의부터 데이터 처리 파이프라인까지 여러 동시 작업을 관리해야 합니다. 단순히 여러 작업을 처리하는 것뿐 아니라, 우선순위, 리소스 제약, 데드라인 압박을 고려한 지능형 관리가 필수입니다.
전통적인 FIFO(First-In-First-Out) 큐 방식은 AI 에이전트 환경에서는 부족합니다. 긴급한 고객 문의가 반복적인 배치 작업 뒤에서 대기하는 상황을 방지해야 하기 때문입니다. 이 글에서는 멀티 태스킹 스케줄링의 고급 기법과 실제 구현 방법을 다룹니다.

2. 멀티 태스킹 아키텍처의 기본
2.1 Task 객체 설계
먼저 작업을 나타내는 기본 데이터 구조를 정의합니다.
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Any
from datetime import datetime, timedelta
class TaskPriority(Enum):
CRITICAL = 0 # 시스템 안정성 관련
HIGH = 1 # 사용자 요청
NORMAL = 2 # 정기 작업
LOW = 3 # 백그라운드 작업
@dataclass
class Task:
id: str
name: str
priority: TaskPriority
duration_estimate: float # 초 단위
deadline: datetime
dependencies: list = None # 선행 작업 ID
retry_count: int = 0
max_retries: int = 3
callback: Callable = None
resource_requirements: dict = None # GPU, 메모리 등
def is_overdue(self) -> bool:
return datetime.now() > self.deadline
def get_urgency_score(self) -> float:
"""데드라인까지 남은 시간과 우선순위를 조합한 긴급도 점수"""
time_remaining = (self.deadline - datetime.now()).total_seconds()
priority_weight = (3 - self.priority.value) * 1000
return priority_weight / max(time_remaining, 1)
이 구조는 다음을 포함합니다:
- 우선순위 열거형: 4단계로 작업을 분류
- 데드라인 관리: 시간 제약을 나타냄
- 의존성 추적: 작업 간 선행 관계 정의
- 리소스 요구사항: GPU, 메모리 등 제약 조건
- 재시도 로직: 실패한 작업의 자동 복구
2.2 스케줄러 아키텍처
from heapq import heappush, heappop
from collections import defaultdict
from threading import Lock
import logging
class TaskScheduler:
def __init__(self, max_workers: int = 5, max_memory_mb: int = 2048):
self.max_workers = max_workers
self.max_memory_mb = max_memory_mb
self.task_queue = [] # Priority queue
self.active_tasks = {} # 현재 실행 중인 작업
self.completed_tasks = defaultdict(list)
self.lock = Lock()
self.logger = logging.getLogger("TaskScheduler")
self.resource_monitor = ResourceMonitor()
def submit_task(self, task: Task) -> str:
"""작업을 큐에 추가"""
with self.lock:
# 우선순위 + 긴급도 점수를 기반으로 정렬
priority_score = (
task.priority.value, # 우선순위 (낮은 값이 높음)
-task.get_urgency_score() # 긴급도 (높을수록 먼저)
)
heappush(self.task_queue, (priority_score, task.id, task))
self.logger.info(f"Task {task.id} submitted with priority {task.priority.name}")
return task.id
def get_next_task(self) -> Task:
"""실행 가능한 다음 작업 선택"""
with self.lock:
while self.task_queue:
_, task_id, task = heappop(self.task_queue)
# 의존성 확인
if task.dependencies and not self._are_dependencies_met(task.dependencies):
# 의존성이 충족되지 않으면 다시 큐에 추가 (뒤로)
heappush(self.task_queue, (
(task.priority.value, -task.get_urgency_score()),
task.id,
task
))
continue
# 리소스 확인
if not self.resource_monitor.can_allocate(task.resource_requirements):
heappush(self.task_queue, (
(task.priority.value, -task.get_urgency_score()),
task.id,
task
))
continue
# 데드라인 확인
if task.is_overdue():
self.logger.warning(f"Task {task.id} is overdue")
return task
return None
def _are_dependencies_met(self, dependency_ids: list) -> bool:
"""모든 의존 작업이 완료되었는지 확인"""
for dep_id in dependency_ids:
if dep_id not in self.completed_tasks:
return False
return True
3. 우선순위 큐(Priority Queue) 구현
3.1 다단계 우선순위 시스템
단순한 우선순위만으로는 부족합니다. 실제 시스템에서는 다음을 고려해야 합니다:
class AdvancedPriorityCalculator:
"""복합적 우선순위 계산"""
def __init__(self):
self.base_priority_weight = 40 # 기본 우선순위 가중치
self.urgency_weight = 35 # 데드라인 긴급도
self.sla_weight = 15 # SLA 위반 위험
self.fairness_weight = 10 # 공정성 (starvation 방지)
def calculate_score(self, task: Task, queue_wait_time: float) -> float:
"""종합 점수 계산 (높을수록 우선)"""
# 1) 기본 우선순위 점수 (0-100)
base_score = (100 - (task.priority.value * 25))
# 2) 긴급도 점수: 데드라인까지 남은 시간
time_remaining = (task.deadline - datetime.now()).total_seconds()
if time_remaining <= 0:
urgency_score = 100 # 오버데드라인
else:
# 남은 시간이 적을수록 높은 점수
urgency_score = max(0, 100 - (time_remaining / 3600) * 10)
# 3) SLA 위반 위험도
sla_score = min(100, (queue_wait_time / 300) * 100) # 5분 이상 대기 시 증가
# 4) 공정성 점수: 오래 기다린 작업 우선
fairness_score = min(100, (queue_wait_time / 600) * 100) # 10분 기준
# 가중치 적용
total_score = (
(base_score * self.base_priority_weight) +
(urgency_score * self.urgency_weight) +
(sla_score * self.sla_weight) +
(fairness_score * self.fairness_weight)
) / 100
return total_score
3.2 예제: 우선순위 큐 동작
# 작업 생성
task1 = Task(
id="user_request_1",
name="Customer Support Query",
priority=TaskPriority.HIGH,
duration_estimate=5,
deadline=datetime.now() + timedelta(minutes=15),
resource_requirements={"gpu": False, "memory_mb": 128}
)
task2 = Task(
id="batch_process_1",
name="Daily Data Aggregation",
priority=TaskPriority.NORMAL,
duration_estimate=120,
deadline=datetime.now() + timedelta(hours=4),
resource_requirements={"gpu": True, "memory_mb": 1024}
)
task3 = Task(
id="critical_alert_1",
name="System Health Check",
priority=TaskPriority.CRITICAL,
duration_estimate=2,
deadline=datetime.now() + timedelta(minutes=5),
resource_requirements={"gpu": False, "memory_mb": 64}
)
# 스케줄러에 추가 (순서대로)
scheduler = TaskScheduler(max_workers=2)
scheduler.submit_task(task1) # HIGH
scheduler.submit_task(task2) # NORMAL
scheduler.submit_task(task3) # CRITICAL
# 실행 순서: task3 → task1 → task2
# (CRITICAL이 먼저, 그 다음 HIGH, 마지막 NORMAL)
4. 스케줄링 알고리즘 심층 분석
4.1 선점형(Preemptive) vs 비선점형(Non-Preemptive) 스케줄링
class PreemptiveScheduler(TaskScheduler):
"""현재 실행 중인 작업을 중단하고 더 높은 우선순위 작업 실행"""
def check_and_preempt(self):
"""더 높은 우선순위 작업이 있으면 현재 작업 중단"""
if not self.task_queue:
return
with self.lock:
# 큐에서 가장 높은 우선순위 작업
next_priority, _, _ = self.task_queue[0]
# 현재 실행 중인 작업 중 낮은 우선순위 작업 찾기
for task_id, running_task in list(self.active_tasks.items()):
current_priority = (running_task.priority.value, -running_task.get_urgency_score())
if current_priority > next_priority:
self.logger.info(f"Preempting {task_id}, new task has higher priority")
# 작업 일시 중단 (또는 중단)
self._preempt_task(task_id)
return
def _preempt_task(self, task_id: str):
"""작업을 일시 중단하고 재큐"""
task = self.active_tasks.pop(task_id)
# 작업 상태 저장 (나중에 재개)
heappush(self.task_queue, (
(task.priority.value, -task.get_urgency_score()),
task.id,
task
))
주의: 선점형 스케줄링은 강력하지만, overhead가 클 수 있습니다. LLM 작업의 경우, 중단 후 재개 비용이 높으므로 신중하게 사용해야 합니다.
4.2 SJF(Shortest Job First) 변형
스케줄링 알고리즘 최적화를 통해 throughput을 30-50% 향상시킬 수 있습니다. 우선순위와 작업 길이를 함께 고려하면 더욱 효율적입니다.

5. 리소스 경합(Resource Contention) 해결
5.1 리소스 모니터링
import psutil
from typing import Dict
class ResourceMonitor:
def __init__(self, alert_threshold: float = 0.9):
self.alert_threshold = alert_threshold
self.allocated_resources = {}
def get_available_resources(self) -> Dict[str, float]:
"""사용 가능한 리소스 조회"""
return {
"cpu_percent": 100 - psutil.cpu_percent(interval=1),
"memory_mb": psutil.virtual_memory().available / (1024 ** 2),
"gpu_memory_mb": self._get_gpu_memory()
}
def can_allocate(self, requirements: Dict[str, float]) -> bool:
"""리소스 할당 가능 여부 확인"""
available = self.get_available_resources()
if requirements.get("memory_mb", 0) > available["memory_mb"]:
return False
if requirements.get("cpu_percent", 0) > available["cpu_percent"]:
return False
return True
6. 실시간 작업 모니터링 및 조정
6.1 동적 우선순위 조정
프로덕션 환경에서 실제 메트릭을 기반으로 우선순위를 동적으로 조정합니다. P99 latency를 모니터링하고 SLA 위반을 예방합니다.
7. 프로덕션 배포 및 튜닝
7.1 설정 파일 예제
# scheduler_config.yaml
scheduler:
max_workers: 5
max_memory_mb: 2048
priority_weights:
base_priority: 40
urgency: 35
sla: 15
fairness: 10
preemption:
enabled: false # LLM 작업의 경우 false 권장
resource_limits:
max_cpu_percent: 80
max_memory_percent: 85
max_gpu_memory_percent: 90
monitoring:
collect_metrics: true
log_level: INFO
alert_threshold: 0.9
8. 결론 및 실무 권장사항
주요 학습 포인트
-
우선순위 시스템은 단순하지 않습니다: 기본 우선순위 + 긴급도 + SLA + 공정성을 균형 있게 고려해야 합니다.
-
리소스 제약이 핵심입니다: GPU, 메모리, CPU 등 실제 리소스를 모니터링하고 할당하는 것이 성능을 결정합니다.
-
선점형 스케줄링은 신중하게: LLM 작업은 중단-재개 비용이 높으므로, 대부분의 경우 비선점형이 낫습니다.
-
메트릭 수집이 필수: p99 latency, average wait time 등을 지속적으로 모니터링해야 튜닝 방향을 알 수 있습니다.
실무 권장사항
- 개발 단계: 단순한 우선순위 큐로 시작, 필요에 따라 고도화
- 테스트: 다양한 부하와 작업 패턴에서 테스트 (높은 부하, 낮은 부하, 공정성 요구)
- 모니터링: 프로덕션 배포 후 3개월은 매일 메트릭 검토
- 튜닝: 실제 사용 패턴에 맞게 가중치와 threshold 조정
Modern AI systems require sophisticated scheduling to balance competing demands. By implementing priority queues with dynamic adjustment, resource monitoring, and fairness mechanisms, you create a system that serves both urgent requests and background tasks effectively.