diff --git a/Backend/Handler/IpFilterHandler.cs b/Backend/Handler/IpFilterHandler.cs index fe46813..dfc9dd3 100644 --- a/Backend/Handler/IpFilterHandler.cs +++ b/Backend/Handler/IpFilterHandler.cs @@ -1,7 +1,5 @@ using System.Collections.Concurrent; -using System.Diagnostics; using Backend.Helper; -using Models.Handler; using Models.Model.Backend; namespace Backend.Handler; @@ -10,23 +8,20 @@ public class IpFilterHandler { private readonly ConcurrentQueue _discardedQueue; private readonly ConcurrentQueue _unfilteredQueue; - private readonly ConcurrentQueue _preFilteredQueue; - private DbHandler _dbHandler; + private readonly ConcurrentQueue _preFilteredQueue; private ThreadHandler _threadHandler; private bool _stop; - private bool _fillerStop; + private bool _done; private bool _stopAutoscaledThreads; private int _timeout; public IpFilterHandler(ConcurrentQueue discardedQueue, ConcurrentQueue unfilteredQueue, - ConcurrentQueue preFilteredQueue, - DbHandler dbHandler, ThreadHandler threadHandler) + ConcurrentQueue preFilteredQueue, ThreadHandler threadHandler) { _discardedQueue = discardedQueue; _unfilteredQueue = unfilteredQueue; _preFilteredQueue = preFilteredQueue; - _dbHandler = dbHandler; _threadHandler = threadHandler; _timeout = 16; @@ -67,64 +62,15 @@ public class IpFilterHandler return waitHandles; } - - public void AutoScaler() - { - int i = 0; - int j = 0; - - while (!_stop) - { - if (_preFilteredQueue.Count >= 2000) - { - if (i == 10) - { - _stopAutoscaledThreads = false; - Console.WriteLine("Autoscaler started"); - - while (!_stopAutoscaledThreads) - { - if (_preFilteredQueue.Count <= 2000) - { - if (j == 1000) - { - _stopAutoscaledThreads = true; - } - - j++; - - Thread.Sleep(128); - } - else - { - EventWaitHandle handle = new(false, EventResetMode.ManualReset); - Thread f = new (Filter_AutoScaler!); - f.Start(handle); - Thread.Sleep(16); - } - } - } - - i++; - } - else - { - i = 0; - j = 0; - } - - Thread.Sleep(128); - } - } - + private void Filter(object obj) { int counter = 0; while (!_stop) { - if (_preFilteredQueue.IsEmpty && _fillerStop) + if (_preFilteredQueue.IsEmpty) { - if (counter == 100) + if (counter == 30_000) { _threadHandler.Stop(); _stop = true; @@ -132,63 +78,21 @@ public class IpFilterHandler counter++; Thread.Sleep(128); - } - - _preFilteredQueue.TryDequeue(out FilterQueueItem item); - - (int, int) ports = TcpClientHelper.CheckPort(item.Ip, 80, 443); - - if (ports is { Item1: 0, Item2: 0 }) - { - _discardedQueue.Enqueue(CreateDiscardedQueueItem(item.Ip, item.ResponseCode)); - continue; - } - - _unfilteredQueue.Enqueue(CreateUnfilteredQueueItem(item.Ip, ports)); - } - - ((EventWaitHandle) obj).Set(); - } - - public void FillFilterQueue() - { - Console.WriteLine("Fill FilterQueue started."); - while (!_stop) - { - if (_preFilteredQueue.Count > 500) continue; - - if (_dbHandler.GetPreFilterQueueItem(out FilterQueueItem item)) - { - _preFilteredQueue.Enqueue(item); - } - else - { - _fillerStop = true; - } - } - } - - private void Filter_AutoScaler(object obj) - { - while (!_stopAutoscaledThreads) - { - if (_preFilteredQueue.IsEmpty) - { - Thread.Sleep(_timeout); + continue; } - _preFilteredQueue.TryDequeue(out FilterQueueItem item); + _preFilteredQueue.TryDequeue(out Ip item); - (int, int) ports = TcpClientHelper.CheckPort(item.Ip, 80, 443); + (int, int) ports = TcpClientHelper.CheckPort(item, 80, 443); if (ports is { Item1: 0, Item2: 0 }) { - _discardedQueue.Enqueue(CreateDiscardedQueueItem(item.Ip, item.ResponseCode)); + _discardedQueue.Enqueue(CreateDiscardedQueueItem(item, item.ResponseCode)); continue; } - _unfilteredQueue.Enqueue(CreateUnfilteredQueueItem(item.Ip, ports)); + _unfilteredQueue.Enqueue(CreateUnfilteredQueueItem(item, ports)); } ((EventWaitHandle) obj).Set(); diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index 8afa19c..7fcd04e 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -22,7 +22,7 @@ public class ScanSettings public class IpScanner { private readonly ConcurrentQueue _discardedQueue; - private readonly ConcurrentQueue _preFilteredQueue; + private readonly ConcurrentQueue _preFilteredQueue; private readonly ConcurrentQueue _resumeQueue; private readonly DbHandler _dbHandler; private bool _stop; @@ -30,7 +30,7 @@ public class IpScanner public IpScanner(ConcurrentQueue discardedQueue, ConcurrentQueue resumeQueue, DbHandler dbHandler, - ConcurrentQueue preFilteredQueue) + ConcurrentQueue preFilteredQueue) { _dbHandler = dbHandler; _preFilteredQueue = preFilteredQueue; @@ -283,15 +283,11 @@ public class IpScanner }; } - private static FilterQueueItem CreateUnfilteredQueueItem(Ip ip, int responseCode) + private static Ip CreateUnfilteredQueueItem(Ip ip, int responseCode) { - FilterQueueItem filterQueueItem = new() - { - Ip = ip, - ResponseCode = responseCode - }; + ip.ResponseCode = responseCode; - return filterQueueItem; + return ip; } public void Stop() diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index e819f51..1cdee77 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -15,22 +15,22 @@ public class ThreadHandler private bool _contentFilterStopped; private bool _ipFilterStopped; - private bool _stage1 = true; - private bool _stage2; - private bool _stage3; + private bool _stage1 = false; + private bool _stage2 = true; + private bool _stage3 = false; ConcurrentQueue filteredQueue = new(); ConcurrentQueue discardedQueue = new(); ConcurrentQueue unfilteredQueue = new(); ConcurrentQueue scannerResumeQueue = new(); - ConcurrentQueue preFilteredQueue = new(); + ConcurrentQueue preFilteredQueue = new(); public ThreadHandler(string path) { _dbHandler = new(filteredQueue, discardedQueue, unfilteredQueue, scannerResumeQueue, preFilteredQueue, path); _ipScanner = new(discardedQueue, scannerResumeQueue, _dbHandler, preFilteredQueue); _contentFilter = new(filteredQueue, unfilteredQueue, _dbHandler, path, this); - _ipFilterHandler = new(discardedQueue, unfilteredQueue, preFilteredQueue, _dbHandler, this); + _ipFilterHandler = new(discardedQueue, unfilteredQueue, preFilteredQueue, this); } public void Start() @@ -42,19 +42,20 @@ public class ThreadHandler Thread discarded = new(StartDiscardedDbHandler); Thread filtered = new(StartFilteredDbHandler); Thread resume = new(StartResumeDbHandler); - Thread ipFilterAutoScaler = new(StartIpFilterAutoScaler); Thread contentFilterThread = new(StartContentFilterThread); Thread prefilterDb = new(StartPreFilterDbHandler); Thread fillIpFilterQueue = new(StartFillIpFilterQueue); //Thread check = new(CheckQueue); - + + //check.Start(); + if (_stage1) { discarded.Start(); // de-queues from discardedQueue prefilterDb.Start(); // de-queues from preFilteredQueue scanner.Start(); // en-queues to discardedQueue and preFilteredQueue resume.Start(); // de-queues from resumeQueue - + scanner.Join(); Stop(); @@ -62,22 +63,21 @@ public class ThreadHandler prefilterDb.Join(); resume.Join(); } - + if (_stage2) { database.Start(); // de-queues from unfilteredQueue discarded.Start(); // de-queues from discardedQueue ipFilter.Start(); // en-queues to discardedQueue and unfilteredQueue - ipFilterAutoScaler.Start(); // de-queues from preFilteredQueue, en-queues to discardedQueue and unfilteredQueue fillIpFilterQueue.Start(); // reads from preFiltered database, en-queues to preFilteredQueue - + + ipFilter.Join(); + Stop(); database.Join(); discarded.Join(); - ipFilter.Join(); - ipFilterAutoScaler.Join(); fillIpFilterQueue.Join(); } - + if (_stage3) { filtered.Start(); // de-queues from filteredQueue @@ -91,7 +91,7 @@ public class ThreadHandler indexer.Join(); } } - + private void CheckQueue() { while (true) @@ -105,13 +105,14 @@ public class ThreadHandler Thread.Sleep(5); } } + private void StartScanner() { Thread.Sleep(15000); // Let the database handler instantiate and warm up first. List wait = _ipScanner.Start(256); - + for (int i = 0; i < wait.Count; i++) { WaitHandle.WaitAll(wait[i]); @@ -135,26 +136,21 @@ public class ThreadHandler private void StartContentFilterThread() { - WaitHandle[] wait = _contentFilter.StartFilterThread(64); + WaitHandle[] wait = _contentFilter.StartFilterThread(8); WaitHandle.WaitAll(wait); } - private void StartIpFilterAutoScaler() - { - _ipFilterHandler.AutoScaler(); - } - private void StartFillIpFilterQueue() { - _ipFilterHandler.FillFilterQueue(); + _dbHandler.GetPreFilterQueueItem(); } private void StartIpFilter() { Thread.Sleep(1000); - List wait = _ipFilterHandler.Start(256); + List wait = _ipFilterHandler.Start(1024); for (int i = 0; i < wait.Count; i++) { diff --git a/Models/Handler/DbHandler.cs b/Models/Handler/DbHandler.cs index 6e4d3ff..2b700d3 100644 --- a/Models/Handler/DbHandler.cs +++ b/Models/Handler/DbHandler.cs @@ -5,7 +5,6 @@ using System.Text; using Microsoft.Data.Sqlite; using Models.Helper; using Models.Model.Backend; -using Models.Model.External; namespace Models.Handler; @@ -15,14 +14,13 @@ public class DbHandler private readonly ConcurrentQueue _unfilteredQueue; private readonly ConcurrentQueue _discardedQueue; private readonly ConcurrentQueue _resumeQueue; - private readonly ConcurrentQueue _preFilteredQueue; + private readonly ConcurrentQueue _preFilteredQueue; private readonly string _unfilteredConnectionString; private readonly string _preFilteredConnectionString; private readonly string _filteredConnectionString; private readonly string _resumeConnectionString; private readonly string _compressedConnectionString; - private readonly List _discardedConnectionStrings = []; private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + @@ -98,7 +96,7 @@ public class DbHandler private const string ReadResumeStatement = "SELECT * FROM Resume WHERE ThreadNumber == @threadNumber;"; private const string ReadCompressedDbRowsStatement = "SELECT Rows FROM CompressedDatabases;"; private const string ReadPreFilteredIdsStatement = "SELECT Id FROM PreFiltered WHERE Filtered == 0;"; - private const string ReadPreFilteredStatement = "SELECT Ip1, Ip2, Ip3, Ip4, ResponseCode, Id FROM PreFiltered WHERE Filtered == 0 ORDER BY Ip1 ASC LIMIT 1;"; + private const string ReadPreFilteredStatement = "SELECT Ip1, Ip2, Ip3, Ip4, ResponseCode, Id FROM PreFiltered WHERE Id == @id ORDER BY Id ASC LIMIT 1;"; private const string UpdatePreFilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE PreFiltered SET Filtered = 1 WHERE Id == @id;"; private const string UpdateUnfilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE Unfiltered SET Filtered = 1 WHERE Id == @id;"; @@ -109,7 +107,6 @@ public class DbHandler private const string VacuumDatabasesStatement = "VACUUM;"; - private readonly object _readFilteredLock = new(); private readonly object _readAndDeleteResumeLock = new(); private bool _stop; @@ -126,7 +123,7 @@ public class DbHandler ConcurrentQueue discardedQueue, ConcurrentQueue unfilteredQueue, ConcurrentQueue resumeQueue, - ConcurrentQueue preFilteredQueue, + ConcurrentQueue preFilteredQueue, string basePath) { _filteredQueue = filteredQueue; @@ -219,7 +216,7 @@ public class DbHandler continue; } - _preFilteredQueue.TryDequeue(out FilterQueueItem queueItem); + _preFilteredQueue.TryDequeue(out Ip queueItem); InsertPrefiltered(queueItem); } @@ -524,31 +521,17 @@ public class DbHandler command.Dispose(); } - private void InsertCompressedDatabase(int threadNumber, long rows) - { - using SqliteConnection connection = new(_compressedConnectionString); - connection.Open(); - - using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection); - - command.Parameters.AddWithValue("@dbNumber", threadNumber); - command.Parameters.AddWithValue("@rows", rows); - - _ = command.ExecuteNonQuery(); - connection.Close(); - } - - private void InsertPrefiltered(FilterQueueItem filterQueueItem) + private void InsertPrefiltered(Ip filterQueueItem) { using SqliteConnection connection = new(_preFilteredConnectionString); connection.Open(); using SqliteCommand command = new(InsertPreFilteredStatement, connection); - command.Parameters.AddWithValue("@ip1", filterQueueItem.Ip.Ip1); - command.Parameters.AddWithValue("@ip2", filterQueueItem.Ip.Ip2); - command.Parameters.AddWithValue("@ip3", filterQueueItem.Ip.Ip3); - command.Parameters.AddWithValue("@ip4", filterQueueItem.Ip.Ip4); + command.Parameters.AddWithValue("@ip1", filterQueueItem.Ip1); + command.Parameters.AddWithValue("@ip2", filterQueueItem.Ip2); + command.Parameters.AddWithValue("@ip3", filterQueueItem.Ip3); + command.Parameters.AddWithValue("@ip4", filterQueueItem.Ip4); command.Parameters.AddWithValue("@responseCode", filterQueueItem.ResponseCode); command.Parameters.AddWithValue("@filtered", 0); @@ -624,42 +607,62 @@ public class DbHandler return ids; } - public bool GetPreFilterQueueItem(out FilterQueueItem filterQueueItem) + public void GetPreFilterQueueItem() { using SqliteConnection connection = new(_preFilteredConnectionString); connection.Open(); + + const string temp = "SELECT seq FROM sqlite_sequence;"; SqliteCommand command = new(ReadPreFilteredStatement, connection); - using SqliteDataReader reader = command.ExecuteReader(); - - filterQueueItem = new(); - Ip ip = new(); - long id = 0; - - if (!reader.HasRows) - { - return false; - } + SqliteCommand command2 = new(UpdatePreFilteredStatement, connection); + SqliteCommand command3 = new(temp, connection); + SqliteDataReader reader = command3.ExecuteReader(); + + long count = 0; while (reader.Read()) { - ip.Ip1 = reader.GetInt32(0); - ip.Ip2 = reader.GetInt32(1); - ip.Ip3 = reader.GetInt32(2); - ip.Ip4 = reader.GetInt32(3); - filterQueueItem.ResponseCode = reader.GetInt32(4); - id = reader.GetInt64(5); + count = reader.GetInt64(0); } - filterQueueItem.Ip = ip; + reader.Close(); - command = new(UpdatePreFilteredStatement, connection); - command.Parameters.AddWithValue("@id", id); + for (long i = 0; i < count; i++) + { + if (_preFilteredQueue.Count > 1000) + { + Thread.Sleep(5); + } + + command.Parameters.AddWithValue("@id", i); + reader = command.ExecuteReader(); + command.Parameters.Clear(); + + Ip ip = new(); - command.ExecuteNonQuery(); - command.Dispose(); - - return true; + if (!reader.HasRows) + { + Console.WriteLine("Uuhhh"); + } + + while (reader.Read()) + { + ip.Ip1 = reader.GetInt32(0); + ip.Ip2 = reader.GetInt32(1); + ip.Ip3 = reader.GetInt32(2); + ip.Ip4 = reader.GetInt32(3); + ip.ResponseCode = reader.GetInt32(4); + } + + reader.Close(); + + _preFilteredQueue.Enqueue(ip); + + command2.Parameters.AddWithValue("@id", i); + command2.ExecuteNonQuery(); + command2.Parameters.Clear(); + } } private static long GetDiscardedIndexesForSpecificDb(string connectionString) @@ -779,8 +782,6 @@ public class DbHandler const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, PackedData TEXT NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))"; - _discardedConnectionStrings.Add(databaseName); - using SqliteConnection connection = new(databaseName); connection.Open(); diff --git a/Models/Model/Backend/IP.cs b/Models/Model/Backend/IP.cs index bf0d88a..c754e9b 100644 --- a/Models/Model/Backend/IP.cs +++ b/Models/Model/Backend/IP.cs @@ -10,6 +10,8 @@ public struct Ip public int Ip4 { get; set; } + public int ResponseCode { get; set; } + public override string ToString() { return $"{Ip1}.{Ip2}.{Ip3}.{Ip4}"; diff --git a/Models/Model/External/SearchResult.cs b/Models/Model/External/SearchResult.cs deleted file mode 100644 index 63fb4fe..0000000 --- a/Models/Model/External/SearchResult.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Models.Model.External; - -public class SearchResult -{ - public string? Title { get; set; } = ""; - public string? Url { get; set; } = ""; - public string? Description { get; set; } = ""; -} \ No newline at end of file diff --git a/RSE.sln.DotSettings.user b/RSE.sln.DotSettings.user index f09b7a2..73a2b3a 100644 --- a/RSE.sln.DotSettings.user +++ b/RSE.sln.DotSettings.user @@ -26,14 +26,17 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded \ No newline at end of file