Merge pull request 'Reworked the queue items.' (#21) from SplitUpQueueItem into main
Reviewed-on: #21
This commit is contained in:
commit
407bbc1556
@ -8,7 +8,8 @@ namespace Backend.Handler;
|
|||||||
|
|
||||||
public class ContentFilter
|
public class ContentFilter
|
||||||
{
|
{
|
||||||
private readonly ConcurrentQueue<QueueItem> _queue;
|
private readonly ConcurrentQueue<Filtered> _queue;
|
||||||
|
private readonly ConcurrentQueue<UnfilteredQueueItem> _unfilteredQueue;
|
||||||
private readonly DbHandler _dbHandler;
|
private readonly DbHandler _dbHandler;
|
||||||
private readonly string _getDomainPort80;
|
private readonly string _getDomainPort80;
|
||||||
private readonly string _getDomainPort443;
|
private readonly string _getDomainPort443;
|
||||||
@ -16,12 +17,13 @@ public class ContentFilter
|
|||||||
private int _timeOut;
|
private int _timeOut;
|
||||||
private readonly string _basePath;
|
private readonly string _basePath;
|
||||||
|
|
||||||
public ContentFilter(ConcurrentQueue<QueueItem> queue, DbHandler dbHandler, string basePath)
|
public ContentFilter(ConcurrentQueue<Filtered> queue, ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue, DbHandler dbHandler, string basePath)
|
||||||
{
|
{
|
||||||
_queue = queue;
|
_queue = queue;
|
||||||
_dbHandler = dbHandler;
|
_dbHandler = dbHandler;
|
||||||
_basePath = basePath;
|
_basePath = basePath;
|
||||||
|
_unfilteredQueue = unfilteredQueue;
|
||||||
|
|
||||||
_getDomainPort80 = $"{basePath}/Backend/Scripts/GetDomainNamePort80.sh";
|
_getDomainPort80 = $"{basePath}/Backend/Scripts/GetDomainNamePort80.sh";
|
||||||
_getDomainPort443 = $"{basePath}/Backend/Scripts/GetDomainNamePort443.sh";
|
_getDomainPort443 = $"{basePath}/Backend/Scripts/GetDomainNamePort443.sh";
|
||||||
|
|
||||||
@ -63,14 +65,13 @@ public class ContentFilter
|
|||||||
|
|
||||||
unfiltered.Filtered = true;
|
unfiltered.Filtered = true;
|
||||||
|
|
||||||
QueueItem superUnfilteredObject = new()
|
UnfilteredQueueItem superUnfilteredObject = new()
|
||||||
{
|
{
|
||||||
Unfiltered = unfiltered,
|
Unfiltered = unfiltered,
|
||||||
Operations = Operations.Update,
|
Operations = Operations.Update
|
||||||
DbType = DbType.Unfiltered
|
|
||||||
};
|
};
|
||||||
|
|
||||||
_queue.Enqueue(superUnfilteredObject);
|
_unfilteredQueue.Enqueue(superUnfilteredObject);
|
||||||
|
|
||||||
if (_dbHandler.FilteredIpExists(unfiltered.Ip))
|
if (_dbHandler.FilteredIpExists(unfiltered.Ip))
|
||||||
{
|
{
|
||||||
@ -82,14 +83,7 @@ public class ContentFilter
|
|||||||
filtered.Port1 = unfiltered.Port1;
|
filtered.Port1 = unfiltered.Port1;
|
||||||
filtered.Port2 = unfiltered.Port2;
|
filtered.Port2 = unfiltered.Port2;
|
||||||
|
|
||||||
QueueItem superFilteredObject = new()
|
_queue.Enqueue(filtered);
|
||||||
{
|
|
||||||
Filtered = filtered,
|
|
||||||
Operations = Operations.Insert,
|
|
||||||
DbType = DbType.Filtered
|
|
||||||
};
|
|
||||||
|
|
||||||
_queue.Enqueue(superFilteredObject);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.Sleep(_timeOut);
|
Thread.Sleep(_timeOut);
|
||||||
|
@ -17,18 +17,22 @@ public class ScanSettings
|
|||||||
|
|
||||||
public class IpScanner
|
public class IpScanner
|
||||||
{
|
{
|
||||||
private readonly ConcurrentQueue<QueueItem> _queue;
|
|
||||||
private readonly ConcurrentQueue<Discarded> _discardedQueue;
|
private readonly ConcurrentQueue<Discarded> _discardedQueue;
|
||||||
|
private readonly ConcurrentQueue<UnfilteredQueueItem> _unfilteredQueue;
|
||||||
|
private readonly ConcurrentQueue<ScannerResumeObject> _resumeQueue;
|
||||||
private readonly DbHandler _dbHandler;
|
private readonly DbHandler _dbHandler;
|
||||||
private bool _stop;
|
private bool _stop;
|
||||||
private int _timeout;
|
private int _timeout;
|
||||||
|
|
||||||
public IpScanner(ConcurrentQueue<QueueItem> queue, DbHandler dbHandler, ConcurrentQueue<Discarded> discardedQueue)
|
public IpScanner(ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue, ConcurrentQueue<Discarded> discardedQueue,
|
||||||
|
ConcurrentQueue<ScannerResumeObject> resumeQueue, DbHandler dbHandler
|
||||||
|
)
|
||||||
{
|
{
|
||||||
_queue = queue;
|
|
||||||
_dbHandler = dbHandler;
|
_dbHandler = dbHandler;
|
||||||
_discardedQueue = discardedQueue;
|
_discardedQueue = discardedQueue;
|
||||||
|
_unfilteredQueue = unfilteredQueue;
|
||||||
|
_resumeQueue = resumeQueue;
|
||||||
|
|
||||||
SetTimeout(128);
|
SetTimeout(128);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,7 +171,7 @@ public class IpScanner
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
_queue.Enqueue(CreateUnfilteredQueueItem(ip, ports));
|
_unfilteredQueue.Enqueue(CreateUnfilteredQueueItem(ip, ports));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_stop)
|
if (_stop)
|
||||||
@ -192,13 +196,7 @@ public class IpScanner
|
|||||||
//Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) is at index ({i}) out of ({scanSettings.End}). Remaining ({scanSettings.End - i})");
|
//Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) is at index ({i}) out of ({scanSettings.End}). Remaining ({scanSettings.End - i})");
|
||||||
}
|
}
|
||||||
|
|
||||||
QueueItem resume = new()
|
_resumeQueue.Enqueue(resumeObject);
|
||||||
{
|
|
||||||
ResumeObject = resumeObject,
|
|
||||||
Operations = Operations.Insert
|
|
||||||
};
|
|
||||||
|
|
||||||
_queue.Enqueue(resume);
|
|
||||||
|
|
||||||
scanSettings.Handle!.Set();
|
scanSettings.Handle!.Set();
|
||||||
}
|
}
|
||||||
@ -212,7 +210,7 @@ public class IpScanner
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static QueueItem CreateUnfilteredQueueItem(Ip ip, (int, int) ports)
|
private static UnfilteredQueueItem CreateUnfilteredQueueItem(Ip ip, (int, int) ports)
|
||||||
{
|
{
|
||||||
Unfiltered unfiltered = new()
|
Unfiltered unfiltered = new()
|
||||||
{
|
{
|
||||||
@ -225,8 +223,7 @@ public class IpScanner
|
|||||||
return new()
|
return new()
|
||||||
{
|
{
|
||||||
Unfiltered = unfiltered,
|
Unfiltered = unfiltered,
|
||||||
Operations = Operations.Insert,
|
Operations = Operations.Insert
|
||||||
DbType = DbType.Unfiltered
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,33 +17,41 @@ public class ThreadHandler
|
|||||||
|
|
||||||
public ThreadHandler(string path)
|
public ThreadHandler(string path)
|
||||||
{
|
{
|
||||||
ConcurrentQueue<QueueItem> contentQueue = new();
|
ConcurrentQueue<Filtered> filteredQueue = new();
|
||||||
ConcurrentQueue<Discarded> discardedQueue = new();
|
ConcurrentQueue<Discarded> discardedQueue = new();
|
||||||
|
ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue = new();
|
||||||
|
ConcurrentQueue<ScannerResumeObject> scannerResumeQueue = new();
|
||||||
|
|
||||||
_dbHandler = new(contentQueue, discardedQueue, path);
|
_dbHandler = new(filteredQueue, discardedQueue, unfilteredQueue, scannerResumeQueue, path);
|
||||||
_ipScanner = new(contentQueue, _dbHandler, discardedQueue);
|
_ipScanner = new(unfilteredQueue, discardedQueue, scannerResumeQueue, _dbHandler);
|
||||||
_contentFilter = new(contentQueue, _dbHandler, path);
|
_contentFilter = new(filteredQueue, unfilteredQueue, _dbHandler, path);
|
||||||
_communication = new(_dbHandler, this, _ipScanner, _contentFilter, path);
|
_communication = new(_dbHandler, this, _ipScanner, _contentFilter, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Start()
|
public void Start()
|
||||||
{
|
{
|
||||||
//Thread scanner = new(StartScanner);
|
Thread scanner = new(StartScanner);
|
||||||
Thread indexer = new(StartContentFilter);
|
Thread indexer = new(StartContentFilter);
|
||||||
Thread database = new(StartDbHandler);
|
Thread database = new(StartDbHandler);
|
||||||
//Thread discarded = new(StartDiscardedDbHandler);
|
Thread discarded = new(StartDiscardedDbHandler);
|
||||||
|
Thread filtered = new(StartFilteredDbHandler);
|
||||||
|
Thread resume = new(StartResumeDbHandler);
|
||||||
Thread communication = new(StartCommunicationHandler);
|
Thread communication = new(StartCommunicationHandler);
|
||||||
|
|
||||||
//scanner.Start();
|
scanner.Start();
|
||||||
indexer.Start();
|
indexer.Start();
|
||||||
database.Start();
|
database.Start();
|
||||||
//discarded.Start();
|
discarded.Start();
|
||||||
|
filtered.Start();
|
||||||
|
resume.Start();
|
||||||
communication.Start();
|
communication.Start();
|
||||||
|
|
||||||
//scanner.Join();
|
scanner.Join();
|
||||||
indexer.Join();
|
indexer.Join();
|
||||||
database.Join();
|
database.Join();
|
||||||
//discarded.Join();
|
discarded.Join();
|
||||||
|
filtered.Join();
|
||||||
|
resume.Join();
|
||||||
communication.Join();
|
communication.Join();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,7 +81,17 @@ public class ThreadHandler
|
|||||||
|
|
||||||
private void StartDbHandler()
|
private void StartDbHandler()
|
||||||
{
|
{
|
||||||
_dbHandler.StartContent();
|
_dbHandler.UnfilteredDbHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void StartFilteredDbHandler()
|
||||||
|
{
|
||||||
|
_dbHandler.FilteredDbHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void StartResumeDbHandler()
|
||||||
|
{
|
||||||
|
_dbHandler.ResumeDbHandler();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void StartDiscardedDbHandler()
|
private void StartDiscardedDbHandler()
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
using System.Collections;
|
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using Microsoft.Data.Sqlite;
|
using Microsoft.Data.Sqlite;
|
||||||
using Models.Model.Backend;
|
using Models.Model.Backend;
|
||||||
@ -8,8 +7,10 @@ namespace Models.Handler;
|
|||||||
|
|
||||||
public class DbHandler
|
public class DbHandler
|
||||||
{
|
{
|
||||||
private readonly ConcurrentQueue<QueueItem> _contentQueue;
|
private readonly ConcurrentQueue<Filtered> _filteredQueue;
|
||||||
|
private readonly ConcurrentQueue<UnfilteredQueueItem> _unfilteredQueue;
|
||||||
private readonly ConcurrentQueue<Discarded> _discardedQueue;
|
private readonly ConcurrentQueue<Discarded> _discardedQueue;
|
||||||
|
private readonly ConcurrentQueue<ScannerResumeObject> _resumeQueue;
|
||||||
|
|
||||||
private readonly string _unfilteredConnectionString;
|
private readonly string _unfilteredConnectionString;
|
||||||
private readonly string _discardedConnectionString;
|
private readonly string _discardedConnectionString;
|
||||||
@ -17,10 +18,34 @@ public class DbHandler
|
|||||||
private readonly string _resumeConnectionString;
|
private readonly string _resumeConnectionString;
|
||||||
private readonly List<string> _discardedConnectionStrings = [];
|
private readonly List<string> _discardedConnectionStrings = [];
|
||||||
|
|
||||||
private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Unfiltered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Filtered) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @filtered)";
|
private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||||
private const string InsertIntoFiltered = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Filtered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Title1, Title2, Description1, Description2, Url1, Url2, ServerType1, ServerType2, RobotsTXT1, RobotsTXT2, HttpVersion1, HttpVersion2, CertificateIssuerCountry, CertificateOrganizationName, IpV6, TlsVersion, CipherSuite, KeyExchangeAlgorithm, PublicKeyType1, PublicKeyType2, PublicKeyType3, AcceptEncoding1, AcceptEncoding2, ALPN, Connection1, Connection2) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @title1, @title2, @description1, @description2, @url1, @url2, @serverType1, @serverType2, @robotsTXT1, @robotsTXT2, @httpVersion1, @httpVersion2, @certificateIssuerCountry, @certificateOrganizationName, @ipV6, @tlsVersion, @cipherSuite, @keyExchangeAlgorithm, @publicKeyType1, @publicKeyType2, @publicKeyType3, @acceptEncoding1, @acceptEncoding2, @aLPN, @connection1, @connection2)";
|
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||||
private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode) VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)";
|
" INSERT INTO Unfiltered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Filtered)" +
|
||||||
private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte) VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);";
|
" VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @filtered)";
|
||||||
|
|
||||||
|
private const string InsertIntoFiltered = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||||
|
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||||
|
" INSERT INTO Filtered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Title1, Title2," +
|
||||||
|
" Description1, Description2, Url1, Url2, ServerType1, ServerType2," +
|
||||||
|
" RobotsTXT1, RobotsTXT2, HttpVersion1, HttpVersion2, CertificateIssuerCountry," +
|
||||||
|
" CertificateOrganizationName, IpV6, TlsVersion, CipherSuite, KeyExchangeAlgorithm," +
|
||||||
|
" PublicKeyType1, PublicKeyType2, PublicKeyType3, AcceptEncoding1, AcceptEncoding2," +
|
||||||
|
" ALPN, Connection1, Connection2) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2," +
|
||||||
|
" @title1, @title2, @description1, @description2, @url1, @url2, @serverType1," +
|
||||||
|
" @serverType2, @robotsTXT1, @robotsTXT2, @httpVersion1, @httpVersion2," +
|
||||||
|
" @certificateIssuerCountry, @certificateOrganizationName, @ipV6, @tlsVersion," +
|
||||||
|
" @cipherSuite, @keyExchangeAlgorithm, @publicKeyType1, @publicKeyType2," +
|
||||||
|
" @publicKeyType3, @acceptEncoding1, @acceptEncoding2, @aLPN, @connection1, @connection2)";
|
||||||
|
|
||||||
|
private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||||
|
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||||
|
" INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode)" +
|
||||||
|
" VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)";
|
||||||
|
|
||||||
|
private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||||
|
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||||
|
" INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte)" +
|
||||||
|
" VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);";
|
||||||
|
|
||||||
private const string ReadUnfilteredStatement = "SELECT * FROM Unfiltered WHERE Id = @id;";
|
private const string ReadUnfilteredStatement = "SELECT * FROM Unfiltered WHERE Id = @id;";
|
||||||
private const string ReadUnfilteredIdsStatement = "SELECT Id FROM Unfiltered WHERE Id != 0 ORDER BY Id DESC LIMIT 1;";
|
private const string ReadUnfilteredIdsStatement = "SELECT Id FROM Unfiltered WHERE Id != 0 ORDER BY Id DESC LIMIT 1;";
|
||||||
@ -48,12 +73,17 @@ public class DbHandler
|
|||||||
|
|
||||||
private readonly string _basePath;
|
private readonly string _basePath;
|
||||||
|
|
||||||
public DbHandler(ConcurrentQueue<QueueItem> contentQueue, ConcurrentQueue<Discarded> discardedQueue, string basePath)
|
public DbHandler(ConcurrentQueue<Filtered> filteredQueue,
|
||||||
|
ConcurrentQueue<Discarded> discardedQueue,
|
||||||
|
ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue,
|
||||||
|
ConcurrentQueue<ScannerResumeObject> resumeQueue, string basePath)
|
||||||
{
|
{
|
||||||
_contentQueue = contentQueue;
|
_filteredQueue = filteredQueue;
|
||||||
_discardedQueue = discardedQueue;
|
_discardedQueue = discardedQueue;
|
||||||
|
_unfilteredQueue = unfilteredQueue;
|
||||||
SetContentWaitTime(10);
|
_resumeQueue = resumeQueue;
|
||||||
|
|
||||||
|
SetContentWaitTime(100);
|
||||||
SetDiscardedWaitTime(10);
|
SetDiscardedWaitTime(10);
|
||||||
|
|
||||||
_basePath = basePath;
|
_basePath = basePath;
|
||||||
@ -74,45 +104,78 @@ public class DbHandler
|
|||||||
_discardedWaitTime = waitTime;
|
_discardedWaitTime = waitTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void StartContent()
|
public void UnfilteredDbHandler()
|
||||||
{
|
{
|
||||||
Console.WriteLine("Content DbHandler started");
|
Console.WriteLine("Unfiltered DbHandler started");
|
||||||
|
|
||||||
while (!_stop)
|
while (!_stop)
|
||||||
{
|
{
|
||||||
if (_contentQueue.IsEmpty || _pause)
|
if (_unfilteredQueue.IsEmpty || _pause)
|
||||||
{
|
{
|
||||||
Thread.Sleep(_contentWaitTime);
|
Thread.Sleep(_contentWaitTime);
|
||||||
_paused = true;
|
_paused = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
_contentQueue.TryDequeue(out QueueItem? queueItem);
|
_unfilteredQueue.TryDequeue(out UnfilteredQueueItem queueItem);
|
||||||
|
|
||||||
if (queueItem is null) { continue; }
|
if (queueItem.Operations == Operations.Insert)
|
||||||
|
|
||||||
if (queueItem.Operations == Operations.Insert && queueItem.DbType == DbType.Unfiltered)
|
|
||||||
{
|
{
|
||||||
InsertUnfiltered(queueItem.Unfiltered);
|
InsertUnfiltered(queueItem.Unfiltered);
|
||||||
}
|
}
|
||||||
|
|
||||||
else if (queueItem.Operations == Operations.Insert && queueItem.DbType == DbType.Filtered)
|
else if (queueItem.Operations == Operations.Update)
|
||||||
{
|
|
||||||
InsertFiltered(queueItem.Filtered!);
|
|
||||||
}
|
|
||||||
|
|
||||||
else if (queueItem.Operations == Operations.Insert && queueItem.ResumeObject is not null)
|
|
||||||
{
|
|
||||||
InsertResumeObject(queueItem.ResumeObject);
|
|
||||||
}
|
|
||||||
|
|
||||||
else if (queueItem.Operations == Operations.Update && queueItem.DbType == DbType.Unfiltered)
|
|
||||||
{
|
{
|
||||||
UpdateUnfiltered(queueItem.Unfiltered);
|
UpdateUnfiltered(queueItem.Unfiltered);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Console.WriteLine("Content DbHandler stopped.");
|
Console.WriteLine("Unfiltered DbHandler stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void FilteredDbHandler()
|
||||||
|
{
|
||||||
|
Console.WriteLine("Filtered DB handler started");
|
||||||
|
|
||||||
|
while (!_stop)
|
||||||
|
{
|
||||||
|
if (_filteredQueue.IsEmpty || _pause)
|
||||||
|
{
|
||||||
|
Thread.Sleep(_contentWaitTime);
|
||||||
|
_paused = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_filteredQueue.TryDequeue(out Filtered? queueItem);
|
||||||
|
|
||||||
|
InsertFiltered(queueItem!);
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine("Filtered DbHandler stopped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void ResumeDbHandler()
|
||||||
|
{
|
||||||
|
Console.WriteLine("Resume DB handler started");
|
||||||
|
|
||||||
|
while (!_stop)
|
||||||
|
{
|
||||||
|
if (_resumeQueue.IsEmpty || _pause)
|
||||||
|
{
|
||||||
|
Thread.Sleep(_contentWaitTime);
|
||||||
|
_paused = true;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_resumeQueue.TryDequeue(out ScannerResumeObject? queueItem);
|
||||||
|
|
||||||
|
if (queueItem is not null)
|
||||||
|
{
|
||||||
|
InsertResumeObject(queueItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Console.WriteLine("Resume DbHandler stopped.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public WaitHandle[] Start(int threads)
|
public WaitHandle[] Start(int threads)
|
||||||
@ -131,7 +194,7 @@ public class DbHandler
|
|||||||
|
|
||||||
waitHandles[i] = handle;
|
waitHandles[i] = handle;
|
||||||
|
|
||||||
Thread f = new (RunDiscarded!);
|
Thread f = new (DiscardedDbHandler!);
|
||||||
f.Start(discardedDbHandlerSetting);
|
f.Start(discardedDbHandlerSetting);
|
||||||
|
|
||||||
Thread.Sleep(1000);
|
Thread.Sleep(1000);
|
||||||
@ -140,7 +203,7 @@ public class DbHandler
|
|||||||
return waitHandles;
|
return waitHandles;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void RunDiscarded(object obj)
|
private void DiscardedDbHandler(object obj)
|
||||||
{
|
{
|
||||||
DiscardedDbHandlerSetting discardedDbHandlerSetting = (DiscardedDbHandlerSetting)obj;
|
DiscardedDbHandlerSetting discardedDbHandlerSetting = (DiscardedDbHandlerSetting)obj;
|
||||||
Console.WriteLine($"Discarded DbHandler started with thread: ({discardedDbHandlerSetting.ThreadId})");
|
Console.WriteLine($"Discarded DbHandler started with thread: ({discardedDbHandlerSetting.ThreadId})");
|
||||||
@ -163,7 +226,7 @@ public class DbHandler
|
|||||||
|
|
||||||
discardedDbHandlerSetting.Handle!.Set();
|
discardedDbHandlerSetting.Handle!.Set();
|
||||||
|
|
||||||
Console.WriteLine("Content DbHandler stopped.");
|
Console.WriteLine("Discarded DbHandler stopped.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void InsertUnfiltered(Unfiltered unfiltered)
|
private void InsertUnfiltered(Unfiltered unfiltered)
|
||||||
|
@ -1,7 +0,0 @@
|
|||||||
namespace Models.Model.Backend;
|
|
||||||
|
|
||||||
public enum DbType
|
|
||||||
{
|
|
||||||
Unfiltered,
|
|
||||||
Filtered,
|
|
||||||
}
|
|
@ -4,5 +4,4 @@ public enum Operations
|
|||||||
{
|
{
|
||||||
Insert,
|
Insert,
|
||||||
Update,
|
Update,
|
||||||
Optimize,
|
|
||||||
}
|
}
|
@ -1,10 +0,0 @@
|
|||||||
namespace Models.Model.Backend;
|
|
||||||
|
|
||||||
public class QueueItem
|
|
||||||
{
|
|
||||||
public Unfiltered Unfiltered { get; init; }
|
|
||||||
public Filtered? Filtered { get; init; }
|
|
||||||
public ScannerResumeObject? ResumeObject { get; init; }
|
|
||||||
public Operations Operations { get; init; }
|
|
||||||
public DbType DbType { get; init; }
|
|
||||||
}
|
|
7
Models/Model/Backend/UnfilteredQueueItem.cs
Normal file
7
Models/Model/Backend/UnfilteredQueueItem.cs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
namespace Models.Model.Backend;
|
||||||
|
|
||||||
|
public struct UnfilteredQueueItem
|
||||||
|
{
|
||||||
|
public Unfiltered Unfiltered { get; init; }
|
||||||
|
public Operations Operations { get; init; }
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user