From a6804be9f1131b190bc4b44550c4295603c9778b Mon Sep 17 00:00:00 2001 From: Rasmus Date: Mon, 30 Dec 2024 18:36:04 +0100 Subject: [PATCH] Implement database compression --- Backend/Handler/Communication.cs | 3 +- Backend/Handler/IpScanner.cs | 9 +-- Backend/Handler/ThreadHandler.cs | 8 +-- Models/Handler/DbHandler.cs | 100 ++++++++++++++++++++----------- RSE.sln.DotSettings.user | 2 + 5 files changed, 79 insertions(+), 43 deletions(-) diff --git a/Backend/Handler/Communication.cs b/Backend/Handler/Communication.cs index 44da74b..f0aef9c 100644 --- a/Backend/Handler/Communication.cs +++ b/Backend/Handler/Communication.cs @@ -87,6 +87,7 @@ public class Communication } else { + // This is a workaround for the frontend not understanding a 0f as an actual float, so we use a very small float. status.PercentageOfIpv4Scanned = 0.0000000001f; } @@ -96,7 +97,7 @@ public class Communication status.MyDbSize = databaseSizes.MyDbSize; status.FilteredDbSize = databaseSizes.FilteredDbSize; status.DiscardedDbSize = databaseSizes.DiscardedDbSize; - + byte[] serializedResult = JsonSerializer.SerializeToUtf8Bytes(status); rep.SendFrame(serializedResult); diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index c78a0bf..3652ca3 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -48,10 +48,11 @@ public class IpScanner { threadsAmount = 256 / threads; } - + WaitHandle[] waitHandle1; WaitHandle[] waitHandle2; + // This is jank. But it was to get it production ready. if (threads <= 64) { waitHandle1 = new WaitHandle[threads]; @@ -77,7 +78,7 @@ public class IpScanner ThreadNumber = i, Handle = handle }; - + if (i < 64) { waitHandle1[counter] = handle; @@ -131,6 +132,7 @@ public class IpScanner fourthByte = resumeNow.FourthByte; } + // Empty buffer so we use the lowest abstracted ping.Send() method. byte[] buf = []; using Ping ping = new(); @@ -181,8 +183,7 @@ public class IpScanner _ = IPAddress.TryParse(ip.ToString(), out IPAddress? address); if (address is not null) { - responseCode = IPStatus.TimedOut; //ping.Send(address, _timeout, buf, null).Status; - Thread.Sleep(1); + responseCode = ping.Send(address, _timeout, buf, null).Status; } } catch diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index 78c8f2c..784c270 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -31,7 +31,7 @@ public class ThreadHandler public void Start() { Thread scanner = new(StartScanner); - //Thread indexer = new(StartContentFilter); + Thread indexer = new(StartContentFilter); Thread database = new(StartDbHandler); Thread discarded = new(StartDiscardedDbHandler); Thread filtered = new(StartFilteredDbHandler); @@ -39,7 +39,7 @@ public class ThreadHandler Thread communication = new(StartCommunicationHandler); scanner.Start(); - //indexer.Start(); + indexer.Start(); database.Start(); discarded.Start(); filtered.Start(); @@ -47,7 +47,7 @@ public class ThreadHandler communication.Start(); scanner.Join(); - //indexer.Join(); + indexer.Join(); database.Join(); discarded.Join(); filtered.Join(); @@ -101,7 +101,7 @@ public class ThreadHandler private void StartDiscardedDbHandler() { - WaitHandle[] wait = _dbHandler.Start(3); + WaitHandle[] wait = _dbHandler.Start(4); WaitHandle.WaitAll(wait); diff --git a/Models/Handler/DbHandler.cs b/Models/Handler/DbHandler.cs index 5e23d25..282eaf8 100644 --- a/Models/Handler/DbHandler.cs +++ b/Models/Handler/DbHandler.cs @@ -15,10 +15,9 @@ public class DbHandler private readonly ConcurrentQueue _resumeQueue; private readonly string _unfilteredConnectionString; - private readonly string _discardedConnectionString; private readonly string _filteredConnectionString; private readonly string _resumeConnectionString; - private readonly string _CompressedConnectionString; + private readonly string _compressedConnectionString; private readonly List _discardedConnectionStrings = []; private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + @@ -88,6 +87,7 @@ public class DbHandler private const string ReadFilteredIpStatement = "SELECT Ip1, Ip2, Ip3, Ip4 FROM Filtered WHERE Ip1 == @ip1 AND Ip2 == @ip1 AND Ip3 == @ip1 AND Ip4 == @ip1 ORDER BY Ip1 DESC LIMIT 1;"; private const string ReadDiscardedSeqIdsStatement = "SELECT seq FROM sqlite_sequence;"; private const string ReadAndDeleteResumeStatement = "SELECT * FROM Resume WHERE ThreadNumber == @threadNumber; DELETE FROM RESUME WHERE ThreadNumber == @threadNumber;"; + private const string ReadCompressedDbRowsStatement = "SELECT Rows FROM CompressedDatabases;"; private const string UpdateUnfilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE Unfiltered SET Filtered = 1 WHERE Id == @id;"; @@ -124,10 +124,9 @@ public class DbHandler _basePath = basePath; _unfilteredConnectionString = $"Data Source={basePath}/Models/mydb.db"; - _discardedConnectionString = $"Data Source={basePath}/Models/Discarded.db"; _filteredConnectionString = $"Data Source={basePath}/Models/Filtered.db"; _resumeConnectionString = $"Data Source={basePath}/Models/ScannerResume.db"; - _CompressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db"; + _compressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db"; } public void SetContentWaitTime(int waitTime) @@ -221,7 +220,7 @@ public class DbHandler for (int i = 0; i < threads; i++) { EventWaitHandle handle = new(false, EventResetMode.ManualReset); - + DiscardedDbHandlerSetting discardedDbHandlerSetting = new() { Handle = handle, @@ -233,7 +232,7 @@ public class DbHandler Thread f = new (DiscardedDbHandler!); f.Start(discardedDbHandlerSetting); - Thread.Sleep(1000); + Thread.Sleep(5000); } return waitHandles; @@ -257,19 +256,17 @@ public class DbHandler continue; } - if (i == Random.Shared.Next(1_000_000, 10_000_000) || i >= 10_000_000 && !_compressing) + if (i >= 50_000_000 && !_compressing) { _compressing = true; - + i = 0; - Console.WriteLine("Compressing"); - - InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexes()); + InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString)); int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models"); - CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}.gz"); + CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}"); DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId); @@ -287,7 +284,7 @@ public class DbHandler Console.WriteLine("Discarded DbHandler stopped."); } - + private void DropAndCreateDiscarded(int threadNumber) { string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db"; @@ -481,7 +478,7 @@ public class DbHandler private void InsertCompressedDatabase(int threadNumber, long rows) { - using SqliteConnection connection = new(_CompressedConnectionString); + using SqliteConnection connection = new(_compressedConnectionString); connection.Open(); using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection); @@ -586,26 +583,75 @@ public class DbHandler public long GetDiscardedIndexes() { long rowId = 0; + + SqliteConnection connection; + SqliteCommand command; + SqliteDataReader reader; for (int i = 0; i < _discardedConnectionStrings.Count; i++) { - using SqliteConnection connection = new(_discardedConnectionStrings[i]); + connection = new(_discardedConnectionStrings[i]); connection.Open(); - using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection); - using SqliteDataReader reader = command.ExecuteReader(); + command = new(ReadDiscardedSeqIdsStatement, connection); + reader = command.ExecuteReader(); if (!reader.HasRows) { - return 0; + return rowId; } while (reader.Read()) { rowId += reader.GetInt64(0); } + + connection.Close(); } + connection = new(_compressedConnectionString); + connection.Open(); + command = new(ReadCompressedDbRowsStatement, connection); + reader = command.ExecuteReader(); + + if (!reader.HasRows) + { + return rowId; + } + + while (reader.Read()) + { + rowId += reader.GetInt64(0); + } + + connection.Close(); + connection.Dispose(); + command.Dispose(); + reader.Dispose(); + + return rowId; + } + + private static long GetDiscardedIndexesForSpecificDb(string connectionString) + { + using SqliteConnection connection = new(connectionString); + connection.Open(); + + using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection); + using SqliteDataReader reader = command.ExecuteReader(); + + long rowId = 0; + + if (!reader.HasRows) + { + return rowId; + } + + while (reader.Read()) + { + rowId += reader.GetInt64(0); + } + return rowId; } @@ -734,20 +780,13 @@ public class DbHandler Thread.Sleep(5000); // Just for safety. } - SqliteConnection connection = new(_discardedConnectionString); + SqliteConnection connection = new(_filteredConnectionString); connection.Open(); SqliteCommand command = new(ReIndexDatabasesStatement, connection); _ = command.ExecuteNonQuery(); connection.Close(); - connection = new(_filteredConnectionString); - connection.Open(); - - command = new(ReIndexDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - connection = new(_unfilteredConnectionString); connection.Open(); @@ -773,20 +812,13 @@ public class DbHandler Thread.Sleep(5000); // Just for safety. } - SqliteConnection connection = new(_discardedConnectionString); + SqliteConnection connection = new(_filteredConnectionString); connection.Open(); SqliteCommand command = new(VacuumDatabasesStatement, connection); _ = command.ExecuteNonQuery(); connection.Close(); - connection = new(_filteredConnectionString); - connection.Open(); - - command = new(VacuumDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - connection = new(_unfilteredConnectionString); connection.Open(); diff --git a/RSE.sln.DotSettings.user b/RSE.sln.DotSettings.user index 60eba06..a46370d 100644 --- a/RSE.sln.DotSettings.user +++ b/RSE.sln.DotSettings.user @@ -6,8 +6,10 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded \ No newline at end of file