如何在这段代码中正确实现异步?


有一个实体“任务”。 它具有以下属性:
– 执行器上的负载 – int (从 1 到 5);
– 执行时间(秒) – 整数
这些任务由分发器(负载均衡器)接收。 还有执行器从负载均衡器接收任务并执行它们。 执行器具有以下参数:
– 同时执行任务的最大数量 – int;
– 最大总负载 – 整数。

执行器执行任务的时间为“执行时间(秒)”,该时间在任务属性中进行了描述。 负载均衡器从队列中获取任务并将其分配给执行器。 任务只能分配给那些具有必要的容量储备和任务空闲槽位储备的执行者。 分布应尽可能均匀。 不应该是一名表演者满负荷而其他人在休息。

– 应该是控制台中的可视化视图,显示每个执行器的工作负载(执行的任务数量、占用/空闲的资源数量)以及队列中的任务数量;
– 必须异步实现(所有执行器必须并行工作);
– 在程序开始时,应创建一个任务队列,平衡器将进一步读取该任务队列并将其发送给执行器

我的问题是异步使用不正确。 执行者应该并行工作。 如何正确实施?

我尝试过的:

C#
public class TaskEntity
{
    public int Load { get; set; }
    public int ExecutionTime { get; set; }

    public TaskEntity(int load, int executionTime)
    {
        Load = load;
        ExecutionTime = executionTime;
    }
}

public class Executor
{
    public int MaxTasks { get; set; }
    public int MaxLoad { get; set; }
    public int CurrentLoad { get; set; }
    public List<TaskEntity> CurrentTasks { get; set; }
    public Executor(int maxTasks, int maxLoad)
    {
        MaxTasks = maxTasks;
        MaxLoad = maxLoad;
        CurrentLoad = 0;
        CurrentTasks = new List<TaskEntity>();
    }
}

public class LoadBalancer
{
    private readonly List<Executor> Executors;
    private readonly Queue<TaskEntity> TaskQueue;

    public LoadBalancer(List<Executor> executors)
    {
        Executors = executors;
        TaskQueue = new Queue<TaskEntity>();
    }

    public async Task StartAsync()
    {
        while (true)
        {
            if (TaskQueue.Count > 0)
            {
                var task = TaskQueue.Dequeue();
                var availableExecutor = GetAvailableExecutor(task);

                if (availableExecutor != null)
                {
                    await ExecuteTaskAsync(task, availableExecutor);
                }
            }

            DisplayLoadStatus();
            await Task.Delay(1000); 
        }
    }

    public void EnqueueTask(TaskEntity task)
    {
        TaskQueue.Enqueue(task);
    }

    private Executor GetAvailableExecutor(TaskEntity task)
    {
        return Executors.Find(e => e.CurrentLoad + task.Load <= e.MaxLoad &&
                                     e.CurrentTasks.Count < e.MaxTasks);
    }

    private async Task ExecuteTaskAsync(TaskEntity task, Executor executor)
    {
        executor.CurrentLoad += task.Load;
        executor.CurrentTasks.Add(task);

        await Task.Delay(task.ExecutionTime * 1000);

        executor.CurrentLoad -= task.Load;
        if (executor.CurrentTasks.Contains(task))
        {
            executor.CurrentTasks.Remove(task);
        }
    }

    private void DisplayLoadStatus()
    {
        Console.Clear();
        foreach (var executor in Executors)
        {
            Console.WriteLine($"Executor {Executors.IndexOf(executor) + 1}:");
            Console.WriteLine($"  - Current Workload: {executor.CurrentLoad}/{executor.MaxLoad}");
            Console.WriteLine($"  - Concurrent Tasks: {executor.CurrentTasks.Count}/{executor.MaxTasks}");
            Console.WriteLine();
        }

        Console.WriteLine($"Task Queue Size: {TaskQueue.Count}");
    }



static async Task Main()
{
    var executorCount = 3; 
    var executors = new List<Executor>();

    for (int i = 0; i < executorCount; i++)
    {
        executors.Add(new Executor(maxTasks: 2, maxLoad: 10));
    }

    var loadBalancer = new LoadBalancer(executors);

    var loadBalancerTask = loadBalancer.StartAsync();

    var random = new Random();
    for (int i = 0; i < 10; i++)
    {
        var task = new TaskEntity(load: random.Next(1, 6), executionTime: random.Next(1, 6));

        loadBalancer.EnqueueTask(task);
        await Task.Delay(1000); 
    }

    await loadBalancerTask;
}

解决方案1

当您希望内联运行多个任务时,您需要保留对每个任务的引用而不是等待它。 等待每个任务将导致它们按顺序运行。

这是内联运行作业的简单示例。

1. 工作内容:

C#
class Job
{
    public string Id { get; }
    public int Runtime { get; }

    public Job(string id, int runtime)
    {
        Id = id;
        Runtime = runtime;
    }

    public async Task<Job> ExecuteAsync()
    {
        await Task.Yield();

        int elapsed = 0;
        TimeSpan delay = TimeSpan.FromSeconds(1);

        while (elapsed < Runtime)
        {
            Console.WriteLine($"{Id} ping {elapsed++} of {Runtime}");
            await Task.Delay(delay);
        }

        Console.WriteLine($"{Id} ping {elapsed++} of {Runtime}");
        return this;
    }
}

管理者:

C#
var tasks = new List<Task<Job>>();

var rand = new Random();

for (int i = 0; i < 10; i++)
{
    Job job = new Job($"Task {i}", rand.Next(3, 6));

    tasks.Add(job.ExecuteAsync());
}

while (true)
{
    Task.WaitAny(tasks.ToArray());

    foreach (Task<Job> task in tasks.Where(
        x => x.IsCompleted || x.IsCanceled || x.IsFaulted).ToList())
    {
        var completedJob = await task;
        Console.WriteLine($"!! Job{completedJob.Id} completed");
        tasks.Remove(task);
    }

    if (tasks.Count == 0)
        break;
}

这应该为您提供帮助您更新代码的概念。

コメント

タイトルとURLをコピーしました