RSE/Backend/Handler/ThreadHandler.cs

210 lines
6.4 KiB
C#

using System.Collections.Concurrent;
using Models.Handler;
using Models.Model.Backend;
namespace Backend.Handler;
public class ThreadHandler
{
private readonly DbHandler _dbHandler;
private readonly IpScanner _ipScanner;
private readonly ContentFilter _contentFilter;
private readonly IpFilterHandler _ipFilterHandler;
private bool _ipScannerStopped;
private bool _contentFilterStopped;
private bool _ipFilterStopped;
private bool _stage1;
private bool _stage2 = true;
private bool _stage3;
ConcurrentQueue<Filtered> filteredQueue = new();
ConcurrentQueue<Discarded> discardedQueue = new();
ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue = new();
ConcurrentQueue<ScannerResumeObject> scannerResumeQueue = new();
ConcurrentQueue<FilterQueueItem> preFilteredQueue = new();
public ThreadHandler(string path)
{
_dbHandler = new(filteredQueue, discardedQueue, unfilteredQueue, scannerResumeQueue, preFilteredQueue, path);
_ipScanner = new(discardedQueue, scannerResumeQueue, _dbHandler, preFilteredQueue);
_contentFilter = new(filteredQueue, unfilteredQueue, _dbHandler, path, this);
_ipFilterHandler = new(discardedQueue, unfilteredQueue, preFilteredQueue, _dbHandler, this);
}
public void Start()
{
Thread scanner = new(StartScanner);
Thread ipFilter = new(StartIpFilter);
Thread indexer = new(StartContentFilter);
Thread database = new(StartDbHandler);
Thread discarded = new(StartDiscardedDbHandler);
Thread filtered = new(StartFilteredDbHandler);
Thread resume = new(StartResumeDbHandler);
Thread ipFilterAutoScaler = new(StartIpFilterAutoScaler);
Thread contentFilterThread = new(StartContentFilterThread);
Thread prefilterDb = new(StartPreFilterDbHandler);
Thread fillIpFilterQueue = new(StartFillIpFilterQueue);
//Thread check = new(CheckQueue);
if (_stage1)
{
discarded.Start(); // de-queues from discardedQueue
prefilterDb.Start(); // de-queues from preFilteredQueue
scanner.Start(); // en-queues to discardedQueue and preFilteredQueue
resume.Start(); // de-queues from resumeQueue
discarded.Join();
prefilterDb.Join();
scanner.Join();
resume.Join();
}
if (_stage2)
{
database.Start(); // de-queues from unfilteredQueue
discarded.Start(); // de-queues from discardedQueue
ipFilter.Start(); // en-queues to discardedQueue and unfilteredQueue
ipFilterAutoScaler.Start(); // de-queues from preFilteredQueue, en-queues to discardedQueue and unfilteredQueue
fillIpFilterQueue.Start(); // reads from preFiltered database, en-queues to preFilteredQueue
database.Join();
discarded.Join();
ipFilter.Join();
ipFilterAutoScaler.Join();
fillIpFilterQueue.Join();
}
if (_stage3)
{
filtered.Start(); // de-queues from filteredQueue
database.Start(); // de-queues from unfilteredQueue
indexer.Start(); // en-queues to unfilteredQueue and contentQueue
contentFilterThread.Start(); // de-queues from contentQueue, en-queues to filteredQueue
contentFilterThread.Join();
filtered.Join();
database.Join();
indexer.Join();
}
}
private void CheckQueue()
{
while (true)
{
Console.Clear();
Console.WriteLine($"filteredQueue.Count: {filteredQueue.Count}");
Console.WriteLine($"discardedQueue.Count: {discardedQueue.Count}");
Console.WriteLine($"unfilteredQueue.Count: {unfilteredQueue.Count}");
Console.WriteLine($"scannerResumeQueue.Count: {scannerResumeQueue.Count}");
Console.WriteLine($"preFilteredQueue.Count: {preFilteredQueue.Count}");
Thread.Sleep(5);
}
}
private void StartScanner()
{
Thread.Sleep(15000); // Let the database handler instantiate and warm up first.
List<WaitHandle[]> wait = _ipScanner.Start(256);
for (int i = 0; i < wait.Count; i++)
{
WaitHandle.WaitAll(wait[i]);
}
Console.WriteLine("Scanner finished");
}
private void StartContentFilter()
{
Thread.Sleep(5000);
WaitHandle[] wait = _contentFilter.Start();
WaitHandle.WaitAll(wait);
Console.WriteLine("Content filter finished");
_contentFilterStopped = true;
}
private void StartContentFilterThread()
{
WaitHandle[] wait = _contentFilter.StartFilterThread(64);
WaitHandle.WaitAll(wait);
}
private void StartIpFilterAutoScaler()
{
_ipFilterHandler.AutoScaler();
}
private void StartFillIpFilterQueue()
{
_ipFilterHandler.FillFilterQueue();
}
private void StartIpFilter()
{
Thread.Sleep(1000);
List<WaitHandle[]> wait = _ipFilterHandler.Start(256);
for (int i = 0; i < wait.Count; i++)
{
WaitHandle.WaitAll(wait[i]);
}
Console.WriteLine("Ip filter finished");
_ipFilterStopped = true;
}
private void StartDbHandler()
{
_dbHandler.UnfilteredDbHandler();
}
private void StartFilteredDbHandler()
{
_dbHandler.FilteredDbHandler();
}
private void StartPreFilterDbHandler()
{
_dbHandler.PrefilteredDbHandler();
}
private void StartResumeDbHandler()
{
_dbHandler.ResumeDbHandler();
}
private void StartDiscardedDbHandler()
{
WaitHandle[] wait = _dbHandler.Start(4);
WaitHandle.WaitAll(wait);
Console.WriteLine("Discarded DbHandler finished");
}
public void Stop()
{
Console.WriteLine("Stopping...");
_ipScanner.Stop();
_contentFilter.Stop();
_ipFilterHandler.Stop();
Console.WriteLine("Stopping Extra...");
Thread.Sleep(30_000);
Console.WriteLine("Stopping Super Extra...");
_dbHandler.Stop();
Console.WriteLine("Stopped.");
}
}