Gallery/Gallery.Share/Util/ParallelTask.cs
2021-08-11 14:09:03 +08:00

149 lines
4.5 KiB
C#

using System;
using System.Threading;
namespace Gallery.Util
{
public class ParallelTask : IDisposable
{
public static ParallelTask Start(string tag, int from, int toExclusive, int maxCount, Predicate<int> action, int tagIndex = -1, WaitCallback complete = null)
{
if (toExclusive <= from)
{
if (complete != null)
{
ThreadPool.QueueUserWorkItem(complete);
}
return null;
}
var task = new ParallelTask(tag, from, toExclusive, maxCount, action, tagIndex, complete);
task.Start();
return task;
}
private readonly object sync = new();
private int count;
private bool disposed;
public int TagIndex { get; private set; }
private readonly string tag;
private readonly int max;
private readonly int from;
private readonly int to;
private readonly Predicate<int> action;
private readonly WaitCallback complete;
private ParallelTask(string tag, int from, int to, int maxCount, Predicate<int> action, int tagIndex, WaitCallback complete)
{
if (maxCount <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxCount));
}
max = maxCount;
if (from >= to)
{
throw new ArgumentOutOfRangeException(nameof(from));
}
TagIndex = tagIndex;
this.tag = tag;
this.from = from;
this.to = to;
this.action = action;
this.complete = complete;
}
public void Start()
{
ThreadPool.QueueUserWorkItem(DoStart);
}
public void Dispose()
{
disposed = true;
}
private void DoStart(object state)
{
#if DEBUG
const long TIMEOUT = 60000L;
var sw = new System.Diagnostics.Stopwatch();
long lastElapsed = 0;
sw.Start();
#endif
for (int i = from; i < to; i++)
{
var index = i;
while (count >= max)
{
#if DEBUG
var elapsed = sw.ElapsedMilliseconds;
if (elapsed - lastElapsed > TIMEOUT)
{
lastElapsed = elapsed;
Log.Print($"WARNING: parallel task ({tag}), {count} tasks in queue, cost too much time ({elapsed:n0}ms)");
}
#endif
if (disposed)
{
#if DEBUG
sw.Stop();
Log.Print($"parallel task determinate, disposed ({tag}), cost time ({elapsed:n0}ms)");
#endif
return;
}
Thread.Sleep(16);
}
lock (sync)
{
count++;
}
ThreadPool.QueueUserWorkItem(o =>
{
try
{
if (!action(index))
{
disposed = true;
}
}
catch (Exception ex)
{
Log.Error($"parallel.start ({tag})", $"failed to run action, index: {index}, error: {ex}");
}
finally
{
lock (sync)
{
count--;
}
}
});
}
while (count > 0)
{
#if DEBUG
var elapsed = sw.ElapsedMilliseconds;
if (elapsed - lastElapsed > TIMEOUT)
{
lastElapsed = elapsed;
Log.Print($"WARNING: parallel task ({tag}), {count} ending tasks in queue, cost too much time ({elapsed:n0}ms)");
}
#endif
if (disposed)
{
#if DEBUG
sw.Stop();
Log.Print($"parallel task determinate, disposed ({tag}), ending cost time ({elapsed:n0}ms)");
#endif
return;
}
Thread.Sleep(16);
}
#if DEBUG
sw.Stop();
Log.Print($"parallel task done ({tag}), cost time ({sw.ElapsedMilliseconds:n0}ms)");
#endif
complete?.Invoke(null);
}
}
}