From f2ace6f571c7b0ec43109f3cf83e4fcf384f8191 Mon Sep 17 00:00:00 2001 From: Rasmus Date: Fri, 29 Nov 2024 12:59:07 +0100 Subject: [PATCH] Reworked the queue items. --- Backend/Handler/ContentFilter.cs | 24 ++-- Backend/Handler/IpScanner.cs | 27 ++--- Backend/Handler/ThreadHandler.cs | 40 ++++-- Models/Handler/DbHandler.cs | 127 +++++++++++++++----- Models/Model/Backend/DbType.cs | 7 -- Models/Model/Backend/Operations.cs | 1 - Models/Model/Backend/QueueItem.cs | 10 -- Models/Model/Backend/UnfilteredQueueItem.cs | 7 ++ 8 files changed, 152 insertions(+), 91 deletions(-) delete mode 100644 Models/Model/Backend/DbType.cs delete mode 100644 Models/Model/Backend/QueueItem.cs create mode 100644 Models/Model/Backend/UnfilteredQueueItem.cs diff --git a/Backend/Handler/ContentFilter.cs b/Backend/Handler/ContentFilter.cs index 85f09df..0476fdd 100644 --- a/Backend/Handler/ContentFilter.cs +++ b/Backend/Handler/ContentFilter.cs @@ -8,7 +8,8 @@ namespace Backend.Handler; public class ContentFilter { - private readonly ConcurrentQueue _queue; + private readonly ConcurrentQueue _queue; + private readonly ConcurrentQueue _unfilteredQueue; private readonly DbHandler _dbHandler; private readonly string _getDomainPort80; private readonly string _getDomainPort443; @@ -16,12 +17,13 @@ public class ContentFilter private int _timeOut; private readonly string _basePath; - public ContentFilter(ConcurrentQueue queue, DbHandler dbHandler, string basePath) + public ContentFilter(ConcurrentQueue queue, ConcurrentQueue unfilteredQueue, DbHandler dbHandler, string basePath) { _queue = queue; _dbHandler = dbHandler; _basePath = basePath; - + _unfilteredQueue = unfilteredQueue; + _getDomainPort80 = $"{basePath}/Backend/Scripts/GetDomainNamePort80.sh"; _getDomainPort443 = $"{basePath}/Backend/Scripts/GetDomainNamePort443.sh"; @@ -63,14 +65,13 @@ public class ContentFilter unfiltered.Filtered = true; - QueueItem superUnfilteredObject = new() + UnfilteredQueueItem superUnfilteredObject = new() { Unfiltered = unfiltered, - Operations = Operations.Update, - DbType = DbType.Unfiltered + Operations = Operations.Update }; - _queue.Enqueue(superUnfilteredObject); + _unfilteredQueue.Enqueue(superUnfilteredObject); if (_dbHandler.FilteredIpExists(unfiltered.Ip)) { @@ -82,14 +83,7 @@ public class ContentFilter filtered.Port1 = unfiltered.Port1; filtered.Port2 = unfiltered.Port2; - QueueItem superFilteredObject = new() - { - Filtered = filtered, - Operations = Operations.Insert, - DbType = DbType.Filtered - }; - - _queue.Enqueue(superFilteredObject); + _queue.Enqueue(filtered); } Thread.Sleep(_timeOut); diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index c470b39..fc69a71 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -17,18 +17,22 @@ public class ScanSettings public class IpScanner { - private readonly ConcurrentQueue _queue; private readonly ConcurrentQueue _discardedQueue; + private readonly ConcurrentQueue _unfilteredQueue; + private readonly ConcurrentQueue _resumeQueue; private readonly DbHandler _dbHandler; private bool _stop; private int _timeout; - public IpScanner(ConcurrentQueue queue, DbHandler dbHandler, ConcurrentQueue discardedQueue) + public IpScanner(ConcurrentQueue unfilteredQueue, ConcurrentQueue discardedQueue, + ConcurrentQueue resumeQueue, DbHandler dbHandler + ) { - _queue = queue; _dbHandler = dbHandler; _discardedQueue = discardedQueue; - + _unfilteredQueue = unfilteredQueue; + _resumeQueue = resumeQueue; + SetTimeout(128); } @@ -167,7 +171,7 @@ public class IpScanner continue; } - _queue.Enqueue(CreateUnfilteredQueueItem(ip, ports)); + _unfilteredQueue.Enqueue(CreateUnfilteredQueueItem(ip, ports)); } if (_stop) @@ -192,13 +196,7 @@ public class IpScanner //Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) is at index ({i}) out of ({scanSettings.End}). Remaining ({scanSettings.End - i})"); } - QueueItem resume = new() - { - ResumeObject = resumeObject, - Operations = Operations.Insert - }; - - _queue.Enqueue(resume); + _resumeQueue.Enqueue(resumeObject); scanSettings.Handle!.Set(); } @@ -212,7 +210,7 @@ public class IpScanner }; } - private static QueueItem CreateUnfilteredQueueItem(Ip ip, (int, int) ports) + private static UnfilteredQueueItem CreateUnfilteredQueueItem(Ip ip, (int, int) ports) { Unfiltered unfiltered = new() { @@ -225,8 +223,7 @@ public class IpScanner return new() { Unfiltered = unfiltered, - Operations = Operations.Insert, - DbType = DbType.Unfiltered + Operations = Operations.Insert }; } diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index a16d2ef..47c9a70 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -17,33 +17,41 @@ public class ThreadHandler public ThreadHandler(string path) { - ConcurrentQueue contentQueue = new(); + ConcurrentQueue filteredQueue = new(); ConcurrentQueue discardedQueue = new(); + ConcurrentQueue unfilteredQueue = new(); + ConcurrentQueue scannerResumeQueue = new(); - _dbHandler = new(contentQueue, discardedQueue, path); - _ipScanner = new(contentQueue, _dbHandler, discardedQueue); - _contentFilter = new(contentQueue, _dbHandler, path); + _dbHandler = new(filteredQueue, discardedQueue, unfilteredQueue, scannerResumeQueue, path); + _ipScanner = new(unfilteredQueue, discardedQueue, scannerResumeQueue, _dbHandler); + _contentFilter = new(filteredQueue, unfilteredQueue, _dbHandler, path); _communication = new(_dbHandler, this, _ipScanner, _contentFilter, path); } public void Start() { - //Thread scanner = new(StartScanner); + Thread scanner = new(StartScanner); Thread indexer = new(StartContentFilter); Thread database = new(StartDbHandler); - //Thread discarded = new(StartDiscardedDbHandler); + Thread discarded = new(StartDiscardedDbHandler); + Thread filtered = new(StartFilteredDbHandler); + Thread resume = new(StartResumeDbHandler); Thread communication = new(StartCommunicationHandler); - //scanner.Start(); + scanner.Start(); indexer.Start(); database.Start(); - //discarded.Start(); + discarded.Start(); + filtered.Start(); + resume.Start(); communication.Start(); - //scanner.Join(); + scanner.Join(); indexer.Join(); database.Join(); - //discarded.Join(); + discarded.Join(); + filtered.Join(); + resume.Join(); communication.Join(); } @@ -73,7 +81,17 @@ public class ThreadHandler private void StartDbHandler() { - _dbHandler.StartContent(); + _dbHandler.UnfilteredDbHandler(); + } + + private void StartFilteredDbHandler() + { + _dbHandler.FilteredDbHandler(); + } + + private void StartResumeDbHandler() + { + _dbHandler.ResumeDbHandler(); } private void StartDiscardedDbHandler() diff --git a/Models/Handler/DbHandler.cs b/Models/Handler/DbHandler.cs index 63754ee..87e94b5 100644 --- a/Models/Handler/DbHandler.cs +++ b/Models/Handler/DbHandler.cs @@ -1,4 +1,3 @@ -using System.Collections; using System.Collections.Concurrent; using Microsoft.Data.Sqlite; using Models.Model.Backend; @@ -8,8 +7,10 @@ namespace Models.Handler; public class DbHandler { - private readonly ConcurrentQueue _contentQueue; + private readonly ConcurrentQueue _filteredQueue; + private readonly ConcurrentQueue _unfilteredQueue; private readonly ConcurrentQueue _discardedQueue; + private readonly ConcurrentQueue _resumeQueue; private readonly string _unfilteredConnectionString; private readonly string _discardedConnectionString; @@ -17,10 +18,34 @@ public class DbHandler private readonly string _resumeConnectionString; private readonly List _discardedConnectionStrings = []; - private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Unfiltered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Filtered) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @filtered)"; - private const string InsertIntoFiltered = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Filtered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Title1, Title2, Description1, Description2, Url1, Url2, ServerType1, ServerType2, RobotsTXT1, RobotsTXT2, HttpVersion1, HttpVersion2, CertificateIssuerCountry, CertificateOrganizationName, IpV6, TlsVersion, CipherSuite, KeyExchangeAlgorithm, PublicKeyType1, PublicKeyType2, PublicKeyType3, AcceptEncoding1, AcceptEncoding2, ALPN, Connection1, Connection2) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @title1, @title2, @description1, @description2, @url1, @url2, @serverType1, @serverType2, @robotsTXT1, @robotsTXT2, @httpVersion1, @httpVersion2, @certificateIssuerCountry, @certificateOrganizationName, @ipV6, @tlsVersion, @cipherSuite, @keyExchangeAlgorithm, @publicKeyType1, @publicKeyType2, @publicKeyType3, @acceptEncoding1, @acceptEncoding2, @aLPN, @connection1, @connection2)"; - private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode) VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)"; - private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte) VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);"; + private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + + " INSERT INTO Unfiltered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Filtered)" + + " VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @filtered)"; + + private const string InsertIntoFiltered = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + + " INSERT INTO Filtered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Title1, Title2," + + " Description1, Description2, Url1, Url2, ServerType1, ServerType2," + + " RobotsTXT1, RobotsTXT2, HttpVersion1, HttpVersion2, CertificateIssuerCountry," + + " CertificateOrganizationName, IpV6, TlsVersion, CipherSuite, KeyExchangeAlgorithm," + + " PublicKeyType1, PublicKeyType2, PublicKeyType3, AcceptEncoding1, AcceptEncoding2," + + " ALPN, Connection1, Connection2) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2," + + " @title1, @title2, @description1, @description2, @url1, @url2, @serverType1," + + " @serverType2, @robotsTXT1, @robotsTXT2, @httpVersion1, @httpVersion2," + + " @certificateIssuerCountry, @certificateOrganizationName, @ipV6, @tlsVersion," + + " @cipherSuite, @keyExchangeAlgorithm, @publicKeyType1, @publicKeyType2," + + " @publicKeyType3, @acceptEncoding1, @acceptEncoding2, @aLPN, @connection1, @connection2)"; + + private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + + " INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode)" + + " VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)"; + + private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + + " INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte)" + + " VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);"; private const string ReadUnfilteredStatement = "SELECT * FROM Unfiltered WHERE Id = @id;"; private const string ReadUnfilteredIdsStatement = "SELECT Id FROM Unfiltered WHERE Id != 0 ORDER BY Id DESC LIMIT 1;"; @@ -48,12 +73,17 @@ public class DbHandler private readonly string _basePath; - public DbHandler(ConcurrentQueue contentQueue, ConcurrentQueue discardedQueue, string basePath) + public DbHandler(ConcurrentQueue filteredQueue, + ConcurrentQueue discardedQueue, + ConcurrentQueue unfilteredQueue, + ConcurrentQueue resumeQueue, string basePath) { - _contentQueue = contentQueue; + _filteredQueue = filteredQueue; _discardedQueue = discardedQueue; - - SetContentWaitTime(10); + _unfilteredQueue = unfilteredQueue; + _resumeQueue = resumeQueue; + + SetContentWaitTime(100); SetDiscardedWaitTime(10); _basePath = basePath; @@ -74,45 +104,78 @@ public class DbHandler _discardedWaitTime = waitTime; } - public void StartContent() + public void UnfilteredDbHandler() { - Console.WriteLine("Content DbHandler started"); + Console.WriteLine("Unfiltered DbHandler started"); while (!_stop) { - if (_contentQueue.IsEmpty || _pause) + if (_unfilteredQueue.IsEmpty || _pause) { Thread.Sleep(_contentWaitTime); _paused = true; continue; } - _contentQueue.TryDequeue(out QueueItem? queueItem); + _unfilteredQueue.TryDequeue(out UnfilteredQueueItem queueItem); - if (queueItem is null) { continue; } - - if (queueItem.Operations == Operations.Insert && queueItem.DbType == DbType.Unfiltered) + if (queueItem.Operations == Operations.Insert) { InsertUnfiltered(queueItem.Unfiltered); } - else if (queueItem.Operations == Operations.Insert && queueItem.DbType == DbType.Filtered) - { - InsertFiltered(queueItem.Filtered!); - } - - else if (queueItem.Operations == Operations.Insert && queueItem.ResumeObject is not null) - { - InsertResumeObject(queueItem.ResumeObject); - } - - else if (queueItem.Operations == Operations.Update && queueItem.DbType == DbType.Unfiltered) + else if (queueItem.Operations == Operations.Update) { UpdateUnfiltered(queueItem.Unfiltered); } } - Console.WriteLine("Content DbHandler stopped."); + Console.WriteLine("Unfiltered DbHandler stopped."); + } + + public void FilteredDbHandler() + { + Console.WriteLine("Filtered DB handler started"); + + while (!_stop) + { + if (_filteredQueue.IsEmpty || _pause) + { + Thread.Sleep(_contentWaitTime); + _paused = true; + continue; + } + + _filteredQueue.TryDequeue(out Filtered? queueItem); + + InsertFiltered(queueItem!); + } + + Console.WriteLine("Filtered DbHandler stopped."); + } + + public void ResumeDbHandler() + { + Console.WriteLine("Resume DB handler started"); + + while (!_stop) + { + if (_resumeQueue.IsEmpty || _pause) + { + Thread.Sleep(_contentWaitTime); + _paused = true; + continue; + } + + _resumeQueue.TryDequeue(out ScannerResumeObject? queueItem); + + if (queueItem is not null) + { + InsertResumeObject(queueItem); + } + } + + Console.WriteLine("Resume DbHandler stopped."); } public WaitHandle[] Start(int threads) @@ -131,7 +194,7 @@ public class DbHandler waitHandles[i] = handle; - Thread f = new (RunDiscarded!); + Thread f = new (DiscardedDbHandler!); f.Start(discardedDbHandlerSetting); Thread.Sleep(1000); @@ -140,7 +203,7 @@ public class DbHandler return waitHandles; } - private void RunDiscarded(object obj) + private void DiscardedDbHandler(object obj) { DiscardedDbHandlerSetting discardedDbHandlerSetting = (DiscardedDbHandlerSetting)obj; Console.WriteLine($"Discarded DbHandler started with thread: ({discardedDbHandlerSetting.ThreadId})"); @@ -163,7 +226,7 @@ public class DbHandler discardedDbHandlerSetting.Handle!.Set(); - Console.WriteLine("Content DbHandler stopped."); + Console.WriteLine("Discarded DbHandler stopped."); } private void InsertUnfiltered(Unfiltered unfiltered) diff --git a/Models/Model/Backend/DbType.cs b/Models/Model/Backend/DbType.cs deleted file mode 100644 index 29cdab8..0000000 --- a/Models/Model/Backend/DbType.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Models.Model.Backend; - -public enum DbType -{ - Unfiltered, - Filtered, -} \ No newline at end of file diff --git a/Models/Model/Backend/Operations.cs b/Models/Model/Backend/Operations.cs index fb11b9b..55d0275 100644 --- a/Models/Model/Backend/Operations.cs +++ b/Models/Model/Backend/Operations.cs @@ -4,5 +4,4 @@ public enum Operations { Insert, Update, - Optimize, } \ No newline at end of file diff --git a/Models/Model/Backend/QueueItem.cs b/Models/Model/Backend/QueueItem.cs deleted file mode 100644 index 471e7ca..0000000 --- a/Models/Model/Backend/QueueItem.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace Models.Model.Backend; - -public class QueueItem -{ - public Unfiltered Unfiltered { get; init; } - public Filtered? Filtered { get; init; } - public ScannerResumeObject? ResumeObject { get; init; } - public Operations Operations { get; init; } - public DbType DbType { get; init; } -} \ No newline at end of file diff --git a/Models/Model/Backend/UnfilteredQueueItem.cs b/Models/Model/Backend/UnfilteredQueueItem.cs new file mode 100644 index 0000000..5014eef --- /dev/null +++ b/Models/Model/Backend/UnfilteredQueueItem.cs @@ -0,0 +1,7 @@ +namespace Models.Model.Backend; + +public struct UnfilteredQueueItem +{ + public Unfiltered Unfiltered { get; init; } + public Operations Operations { get; init; } +} \ No newline at end of file