From c7fcc3297fc1523e5ff025d1a577f0de564f3ab4 Mon Sep 17 00:00:00 2001 From: Rasmus Date: Sat, 5 Apr 2025 11:28:54 +0200 Subject: [PATCH] Reduced the memory usage, as well as hopefully fixing the memory leak. Also implemented database compression across rows, including Brotli compression and bitshifting. --- Backend/Handler/IpScanner.cs | 24 ++-- Backend/Handler/ThreadHandler.cs | 9 +- Models/BackupDB/Filtered.db | Bin 118784 -> 110592 bytes Models/BackupDB/PreFiltered.db | Bin 16384 -> 0 bytes Models/BackupDB/ScannerResume.db | Bin 8192 -> 12288 bytes Models/Handler/DbHandler.cs | 181 ++++++++--------------------- Models/Helper/CompressionHelper.cs | 28 +++++ Models/Model/Backend/IP.cs | 5 + RSE.sln.DotSettings.user | 1 + 9 files changed, 103 insertions(+), 145 deletions(-) delete mode 100644 Models/BackupDB/PreFiltered.db diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index 6f9e2f8..8afa19c 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -145,12 +145,12 @@ public class IpScanner if (i == 192 && k == 2) continue; if (i == 192 && j == 88 && k == 99) continue; - if (_discardedQueue.Count >= 2000) + if (_discardedQueue.Count >= 20_000) { Thread.Sleep(1000); } - if (_preFilteredQueue.Count >= 2000) + if (_preFilteredQueue.Count >= 20_000) { Thread.Sleep(1000); } @@ -170,7 +170,9 @@ public class IpScanner resumeObject.FourthByte = l; break; } - + + + Ip ip = new() { Ip1 = i, @@ -186,19 +188,21 @@ public class IpScanner // Sometimes, if the pinger gets a Destination Unreachable Communication administratively prohibited response, the pinger will throw an exception. // https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol?useskin=vector#Control_messages //_ = IPAddress.TryParse(ip.ToString(), out IPAddress? address); - - if (i % 2 == 0) + + responseCode = CustomPing.SendIcmpEchoRequestOverRawSocket(Parse(ip.ToString()), _timeout); + + /*if (l % 2 == 0) { responseCode = IPStatus.Success; } else { responseCode = IPStatus.TimedOut; - } + }*/ - //CustomPing.SendIcmpEchoRequestOverRawSocket(Parse(ip.ToString()), _timeout); - Thread.Sleep(16); + //responseCode = IPStatus.TimedOut; + //Thread.Sleep(0); } catch { @@ -233,10 +237,10 @@ public class IpScanner resumeObject.FirstByte = i; break; } - - Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) is at index ({i}) out of ({scanSettings.End}). Remaining ({scanSettings.End - i})"); } + Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) stopped."); + if (_stop) { resumeObject.Paused = true; diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index 0ac28b0..e819f51 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -15,8 +15,8 @@ public class ThreadHandler private bool _contentFilterStopped; private bool _ipFilterStopped; - private bool _stage1; - private bool _stage2 = true; + private bool _stage1 = true; + private bool _stage2; private bool _stage3; ConcurrentQueue filteredQueue = new(); @@ -54,10 +54,12 @@ public class ThreadHandler prefilterDb.Start(); // de-queues from preFilteredQueue scanner.Start(); // en-queues to discardedQueue and preFilteredQueue resume.Start(); // de-queues from resumeQueue + + scanner.Join(); + Stop(); discarded.Join(); prefilterDb.Join(); - scanner.Join(); resume.Join(); } @@ -205,6 +207,7 @@ public class ThreadHandler Console.WriteLine("Stopping Super Extra..."); _dbHandler.Stop(); + Console.WriteLine("Stopped."); } } \ No newline at end of file diff --git a/Models/BackupDB/Filtered.db b/Models/BackupDB/Filtered.db index ccb8ab2383d3860d6f1e6053719d5b8f77ad2acb..a1172a10b5f4a58021ac01492f6e3c0dcb57d763 100644 GIT binary patch delta 197 zcmZozz~1nHZGyBQHvgT!nH$SIBGfTjBmH@^-_L~$~1h%s@Ffu7{bTROsX6s^L*v_|rv4o8$Ge1w< zImp#f>D^;CAB!YD6^mhs0%E~ WgCqb}BDP)O1mgqt>8IB-ngRfst2l1} delta 356 zcmZp8z}B#UeS)+g4+8^(EEF>VX{Cue#-cn7dht@MKprFiZy?Da&HtPK+jfBoj9XYY zYp|W;1WB<N!*OgvK(%iIVGt@sVR#D3>GOU zU`3l0SQNIiOkiXZ;P}PBf12$VThn&F1&k$ZAUAP<>}6o!be!I|j4{v}DDXyHU!QR? zkntGASd?Fqn3R(Wv~QxHB)hn*EKCGuF^ra(zG@lcx$Q5OGfHqQ5)haez|P7DG*}Yo zVh;AeiJ3snAafaE#wEp#(>K%SA{ zr^$R?co&`4+p!o*rc!R)?y>rq-E?~n8?%ll43rGDXsXmKURPAAcrpr2MEbHhN}*hs{HAjcMcB^q|#eIeuq` z5J6m!Ps@uorI6&lW3Nw=I}p<&wVnlb*&>DRLBS?^J7Mt68fWZL{7m+@+D30wG^2WXpwQIh)%kBp0W?pS`h{ zRlH%>N$nl{P3^8zF&2;eS~6RQb<>*#`;leH61L_JKH;zYf#2~fe$G35k8krlPxE;$ zvkx|AV_Q?NS%<3<@kX#%*`m~WtL6e#lNv5hqD&&_gwP0#;0f+sRD6`74;L_SP>uX% zQ6a5oP>!`Q3LYg?Y=G57>Zu`rQ$R0fI}Sx_m9--m7HEwC delta 85 zcmZojXmFSyEx3h&fq@B#VL)M`jxqlh2EBMGUZ4;o-)08B&6^DcT=`_0ycpTVB_$bK a@=Fqva#Dj*i%WB%B-7^ee1?pR6a)a&3=|On diff --git a/Models/Handler/DbHandler.cs b/Models/Handler/DbHandler.cs index 6175f47..6e4d3ff 100644 --- a/Models/Handler/DbHandler.cs +++ b/Models/Handler/DbHandler.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Text; using Microsoft.Data.Sqlite; using Models.Helper; using Models.Model.Backend; @@ -75,8 +77,8 @@ public class DbHandler private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + - " INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode)" + - " VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)"; + " INSERT INTO Discarded (PackedData)" + + " VALUES (@packedData)"; private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + @@ -263,7 +265,7 @@ public class DbHandler Thread f = new (DiscardedDbHandler!); f.Start(discardedDbHandlerSetting); - Thread.Sleep(5000); + Thread.Sleep(150); } return waitHandles; @@ -278,7 +280,16 @@ public class DbHandler int i = 0; - while (!_stop) + SqliteConnection connection = new(connectionString); + connection.Open(); + + SqliteCommand command = new(InsertIntoDiscarded, connection); + + StringBuilder stringBuilder = new(); + + bool running = true; + + while (!_stop && running) { if (_discardedQueue.IsEmpty || _pause) { @@ -286,31 +297,46 @@ public class DbHandler _paused = true; continue; } - - if (i >= 5_000_000 && !_compressing) + + if (_stop && _discardedQueue.IsEmpty) { - _compressing = true; - - i = 0; - - InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString)); - - int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models"); - - CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}"); - - DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId); - - _compressing = false; + running = false; } - _discardedQueue.TryDequeue(out Discarded queueItem); + if (i == 2048) + { + command.Parameters.AddWithValue("@packedData", CompressionHelper.CompressString(stringBuilder.ToString())); + _ = command.ExecuteNonQuery(); + + // Re-use the command object. + command.Parameters.Clear(); + + stringBuilder.Clear(); + + i = 0; + } - InsertDiscarded(queueItem, connectionString); + if (_discardedQueue.TryDequeue(out Discarded queueItem)) + { + stringBuilder.Append(queueItem.Ip.PackIp()); + stringBuilder.Append(':'); + stringBuilder.Append(queueItem.ResponseCode); + stringBuilder.Append(','); + } i++; } + if (stringBuilder.Length != 0) + { + command.Parameters.AddWithValue("@packedData", CompressionHelper.CompressString(stringBuilder.ToString())); + _ = command.ExecuteNonQuery(); + } + + connection.Close(); + connection.Dispose(); + command.Dispose(); + discardedDbHandlerSetting.Handle!.Set(); Console.WriteLine("Discarded DbHandler stopped."); @@ -359,23 +385,6 @@ public class DbHandler connection.Close(); } - private static void InsertDiscarded(Discarded discarded, string dbConnectionString) - { - using SqliteConnection connection = new(dbConnectionString); - connection.Open(); - - using SqliteCommand command = new(InsertIntoDiscarded, connection); - - command.Parameters.AddWithValue("@ip1", discarded.Ip.Ip1); - command.Parameters.AddWithValue("@ip2", discarded.Ip.Ip2); - command.Parameters.AddWithValue("@ip3", discarded.Ip.Ip3); - command.Parameters.AddWithValue("@ip4", discarded.Ip.Ip4); - command.Parameters.AddWithValue("@responseCode", discarded.ResponseCode); - _ = command.ExecuteNonQuery(); - - connection.Close(); - } - private void InsertFiltered(Filtered filtered) { using SqliteConnection connection = new(_filteredConnectionString); @@ -544,6 +553,7 @@ public class DbHandler command.Parameters.AddWithValue("@filtered", 0); _ = command.ExecuteNonQuery(); + connection.Close(); } @@ -726,36 +736,6 @@ public class DbHandler return true; } - public List GetSearchResults() - { - lock (_readFilteredLock) - { - using SqliteConnection connection = new(_filteredConnectionString); - connection.Open(); - - using SqliteCommand command = new(ReadFilteredStatement, connection); - using SqliteDataReader reader = command.ExecuteReader(); - - if (!reader.HasRows) - { - return []; - } - - List results = []; - - while (reader.Read()) - { - SearchResult result = new(); - result.Title = reader.GetString(0); - result.Url = reader.GetString(1); - - results.Add(result); - } - - return results; - } - } - public ScannerResumeObject? GetResumeObject(int threadNumber) { lock (_readAndDeleteResumeLock) @@ -791,76 +771,13 @@ public class DbHandler return resumeObject; } } - - public void ReIndex() - { - _pause = true; - Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open. - - if (!_paused) - { - Thread.Sleep(5000); // Just for safety. - } - - SqliteConnection connection = new(_filteredConnectionString); - connection.Open(); - - SqliteCommand command = new(ReIndexDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - - connection = new(_unfilteredConnectionString); - connection.Open(); - - command = new(ReIndexDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - - connection.Dispose(); - command.Dispose(); - - _pause = false; - _paused = false; - } - - public void Vacuum() - { - _pause = true; - - Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open. - - if (!_paused) - { - Thread.Sleep(5000); // Just for safety. - } - - SqliteConnection connection = new(_filteredConnectionString); - connection.Open(); - - SqliteCommand command = new(VacuumDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - - connection = new(_unfilteredConnectionString); - connection.Open(); - - command = new(VacuumDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - - connection.Dispose(); - command.Dispose(); - - _pause = false; - _paused = false; - } private (string, string) CreateDiscardedDb(int threadNumber) { string absolutePath = $"{_basePath}/Models/Discarded{threadNumber}.db"; string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db"; - const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))"; + const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, PackedData TEXT NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))"; _discardedConnectionStrings.Add(databaseName); diff --git a/Models/Helper/CompressionHelper.cs b/Models/Helper/CompressionHelper.cs index 64b8a5c..88eada1 100644 --- a/Models/Helper/CompressionHelper.cs +++ b/Models/Helper/CompressionHelper.cs @@ -1,4 +1,5 @@ using System.IO.Compression; +using System.Text; namespace Models.Helper; @@ -19,4 +20,31 @@ public static class CompressionHelper using GZipStream decompressor = new(compressedFileStream, CompressionMode.Decompress); decompressor.CopyTo(decompressedFileStream); } + + public static string CompressString(string text) + { + byte[] byteArray = Encoding.UTF8.GetBytes(text); + + using MemoryStream memoryStream = new(); + + using (BrotliStream compressionStream = new(memoryStream, CompressionLevel.SmallestSize)) + { + compressionStream.Write(byteArray, 0, byteArray.Length); + } + + return Convert.ToBase64String(memoryStream.ToArray()); + } + + public static string DecompressString(string compressedText) + { + byte[] byteArray = Convert.FromBase64String(compressedText); + + using MemoryStream memoryStream = new(byteArray); + + using BrotliStream decompressionStream = new(memoryStream, CompressionMode.Decompress); + + using StreamReader reader = new(decompressionStream); + + return reader.ReadToEnd(); + } } \ No newline at end of file diff --git a/Models/Model/Backend/IP.cs b/Models/Model/Backend/IP.cs index 3b835e1..bf0d88a 100644 --- a/Models/Model/Backend/IP.cs +++ b/Models/Model/Backend/IP.cs @@ -14,4 +14,9 @@ public struct Ip { return $"{Ip1}.{Ip2}.{Ip3}.{Ip4}"; } + + public uint PackIp() + { + return (uint)(Ip1 << 24) | (uint)(Ip2 << 16) | (uint)(Ip3 << 8) | (uint)Ip4; + } } \ No newline at end of file diff --git a/RSE.sln.DotSettings.user b/RSE.sln.DotSettings.user index 92f946d..f09b7a2 100644 --- a/RSE.sln.DotSettings.user +++ b/RSE.sln.DotSettings.user @@ -16,6 +16,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded