From 0cfbcea252e9bdf69dbd957d2ba70a933495ba51 Mon Sep 17 00:00:00 2001 From: Rasmus Date: Tue, 24 Dec 2024 14:23:23 +0100 Subject: [PATCH 1/2] Testing database compression. --- Backend/Handler/IpScanner.cs | 3 +- Backend/Handler/ThreadHandler.cs | 8 +- .../{Discarded.db => CompressedDatabases.db} | Bin 16384 -> 12288 bytes Models/Handler/DbHandler.cs | 76 +++++++++++++++++- Models/Helper/CompressionHelper.cs | 22 +++++ Models/Helper/GetDatabasesHelper.cs | 9 +++ RSE.sln.DotSettings.user | 2 + 7 files changed, 112 insertions(+), 8 deletions(-) rename Models/BackupDB/{Discarded.db => CompressedDatabases.db} (71%) create mode 100644 Models/Helper/CompressionHelper.cs create mode 100644 Models/Helper/GetDatabasesHelper.cs diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index c8d2943..c78a0bf 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -181,7 +181,8 @@ public class IpScanner _ = IPAddress.TryParse(ip.ToString(), out IPAddress? address); if (address is not null) { - responseCode = ping.Send(address, _timeout, buf, null).Status; + responseCode = IPStatus.TimedOut; //ping.Send(address, _timeout, buf, null).Status; + Thread.Sleep(1); } } catch diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index 784c270..78c8f2c 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -31,7 +31,7 @@ public class ThreadHandler public void Start() { Thread scanner = new(StartScanner); - Thread indexer = new(StartContentFilter); + //Thread indexer = new(StartContentFilter); Thread database = new(StartDbHandler); Thread discarded = new(StartDiscardedDbHandler); Thread filtered = new(StartFilteredDbHandler); @@ -39,7 +39,7 @@ public class ThreadHandler Thread communication = new(StartCommunicationHandler); scanner.Start(); - indexer.Start(); + //indexer.Start(); database.Start(); discarded.Start(); filtered.Start(); @@ -47,7 +47,7 @@ public class ThreadHandler communication.Start(); scanner.Join(); - indexer.Join(); + //indexer.Join(); database.Join(); discarded.Join(); filtered.Join(); @@ -101,7 +101,7 @@ public class ThreadHandler private void StartDiscardedDbHandler() { - WaitHandle[] wait = _dbHandler.Start(4); + WaitHandle[] wait = _dbHandler.Start(3); WaitHandle.WaitAll(wait); diff --git a/Models/BackupDB/Discarded.db b/Models/BackupDB/CompressedDatabases.db similarity index 71% rename from Models/BackupDB/Discarded.db rename to Models/BackupDB/CompressedDatabases.db index ddddc9979f78fde1e680425326563baed6d0730b..4a547900dda6905478f96200534182152e226a85 100644 GIT binary patch delta 191 zcmZo@U~EX3AkWLpfB?)Bbu77oTqeGU4E)A?4>vOk6!J~B;13FK^kQTeH#TN$vMxzX z%1L$3&n+lQEiO(?aY+OUBo?O@V-aF<4svx2aa9O$bnL+# zkrJn;Ux=%_YmkDUe~5x#sE>~h7pGECetGfaQ~Y|94fr`083;^LVA;%K@P{7&Z}~U3 delta 391 zcmZojXlP)ZAkWLfz`(!+#Ed}9F;T~om%odFiElCkzXji9-m@DEqj>}QyBM>@_4OH> zluHtma#D*6bCTjqQgaL9!Q6O53;`DBAXmo_SA`HqCm&Y@B@Fq=UHozqo&`#rA+8Z2 zI$WGeL8-+B`FX{u&iN^+lQ-~-PQK4Co5Ez!^-a9)pVmZIA`JS(2HJg>AE-z _discardedConnectionStrings = []; private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + @@ -74,6 +76,10 @@ public class DbHandler " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + " INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte)" + " VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);"; + + private const string InsertIntoCompressedDbConnectionString = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + + " PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" + + " INSERT INTO CompressedDatabases (DbNumber, Rows) VALUES (@dbNumber, @rows)"; private const string ReadUnfilteredStatement = "SELECT * FROM Unfiltered WHERE Id = @id;"; private const string ReadUnfilteredIdsStatement = "SELECT Id FROM Unfiltered WHERE Filtered == 0;"; @@ -95,6 +101,7 @@ public class DbHandler private bool _stop; private bool _pause; private bool _paused; + private bool _compressing; private int _contentWaitTime; private int _discardedWaitTime; @@ -120,6 +127,7 @@ public class DbHandler _discardedConnectionString = $"Data Source={basePath}/Models/Discarded.db"; _filteredConnectionString = $"Data Source={basePath}/Models/Filtered.db"; _resumeConnectionString = $"Data Source={basePath}/Models/ScannerResume.db"; + _CompressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db"; } public void SetContentWaitTime(int waitTime) @@ -236,7 +244,9 @@ public class DbHandler DiscardedDbHandlerSetting discardedDbHandlerSetting = (DiscardedDbHandlerSetting)obj; Console.WriteLine($"Discarded DbHandler started with thread: ({discardedDbHandlerSetting.ThreadId})"); - string connectionString = CreateDiscardedDb(discardedDbHandlerSetting.ThreadId); + (string absolutePath, string connectionString) = CreateDiscardedDb(discardedDbHandlerSetting.ThreadId); + + int i = 0; while (!_stop) { @@ -247,15 +257,60 @@ public class DbHandler continue; } + if (i == Random.Shared.Next(1_000_000, 10_000_000) || i >= 10_000_000 && !_compressing) + { + _compressing = true; + + i = 0; + + Console.WriteLine("Compressing"); + + InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexes()); + + int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models"); + + CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}.gz"); + + DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId); + + _compressing = false; + } + _discardedQueue.TryDequeue(out Discarded queueItem); InsertDiscarded(queueItem, connectionString); + + i++; } discardedDbHandlerSetting.Handle!.Set(); Console.WriteLine("Discarded DbHandler stopped."); } + + private void DropAndCreateDiscarded(int threadNumber) + { + string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db"; + + const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))"; + const string dropStatement = "DROP TABLE Discarded;"; + const string vacuum = "VACUUM;"; + + using SqliteConnection connection = new(databaseName); + connection.Open(); + + SqliteCommand command = new(dropStatement, connection); + command.ExecuteNonQuery(); + + command = new(vacuum, connection); + command.ExecuteNonQuery(); + + command = new(createStatement, connection); + command.ExecuteNonQuery(); + command.Dispose(); + + connection.Close(); + } private void InsertUnfiltered(Unfiltered unfiltered) { @@ -423,6 +478,20 @@ public class DbHandler _ = command.ExecuteNonQuery(); connection.Close(); } + + private void InsertCompressedDatabase(int threadNumber, long rows) + { + using SqliteConnection connection = new(_CompressedConnectionString); + connection.Open(); + + using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection); + + command.Parameters.AddWithValue("@dbNumber", threadNumber); + command.Parameters.AddWithValue("@rows", rows); + + _ = command.ExecuteNonQuery(); + connection.Close(); + } private void UpdateUnfiltered(Unfiltered unfiltered) { @@ -732,8 +801,9 @@ public class DbHandler _paused = false; } - private string CreateDiscardedDb(int threadNumber) + private (string, string) CreateDiscardedDb(int threadNumber) { + string absolutePath = $"{_basePath}/Models/Discarded{threadNumber}.db"; string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db"; const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))"; @@ -746,7 +816,7 @@ public class DbHandler using SqliteCommand command = new(createStatement, connection); command.ExecuteNonQuery(); - return databaseName; + return (absolutePath, databaseName); } public void Stop() diff --git a/Models/Helper/CompressionHelper.cs b/Models/Helper/CompressionHelper.cs new file mode 100644 index 0000000..319b9ff --- /dev/null +++ b/Models/Helper/CompressionHelper.cs @@ -0,0 +1,22 @@ +using System.IO.Compression; + +namespace Models.Helper; + +public static class CompressionHelper +{ + public static void CompressFile(string sourceFile, string targetFile) + { + using FileStream originalFileStream = new(sourceFile, FileMode.Open); + using FileStream compressedFileStream = File.Create($"{targetFile}.gz"); + using GZipStream compressor = new(compressedFileStream, CompressionLevel.Fastest); + originalFileStream.CopyTo(compressor); + } + + public static void DecompressFile(string sourceFile, string targetFile) + { + using FileStream compressedFileStream = new(sourceFile, FileMode.Open); + using FileStream decompressedFileStream = File.Create($"{targetFile}.gz"); + using GZipStream decompressor = new(compressedFileStream, CompressionMode.Decompress); + decompressor.CopyTo(decompressedFileStream); + } +} \ No newline at end of file diff --git a/Models/Helper/GetDatabasesHelper.cs b/Models/Helper/GetDatabasesHelper.cs new file mode 100644 index 0000000..10051ab --- /dev/null +++ b/Models/Helper/GetDatabasesHelper.cs @@ -0,0 +1,9 @@ +namespace Models.Helper; + +public static class GetDatabasesHelper +{ + public static int GetTotalCompressedDatabases(string path) + { + return Directory.GetFiles(path, "*.gz").Length; + } +} \ No newline at end of file diff --git a/RSE.sln.DotSettings.user b/RSE.sln.DotSettings.user index 37aa8f6..60eba06 100644 --- a/RSE.sln.DotSettings.user +++ b/RSE.sln.DotSettings.user @@ -4,8 +4,10 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded \ No newline at end of file -- 2.43.0 From a6804be9f1131b190bc4b44550c4295603c9778b Mon Sep 17 00:00:00 2001 From: Rasmus Date: Mon, 30 Dec 2024 18:36:04 +0100 Subject: [PATCH 2/2] Implement database compression --- Backend/Handler/Communication.cs | 3 +- Backend/Handler/IpScanner.cs | 9 +-- Backend/Handler/ThreadHandler.cs | 8 +-- Models/Handler/DbHandler.cs | 100 ++++++++++++++++++++----------- RSE.sln.DotSettings.user | 2 + 5 files changed, 79 insertions(+), 43 deletions(-) diff --git a/Backend/Handler/Communication.cs b/Backend/Handler/Communication.cs index 44da74b..f0aef9c 100644 --- a/Backend/Handler/Communication.cs +++ b/Backend/Handler/Communication.cs @@ -87,6 +87,7 @@ public class Communication } else { + // This is a workaround for the frontend not understanding a 0f as an actual float, so we use a very small float. status.PercentageOfIpv4Scanned = 0.0000000001f; } @@ -96,7 +97,7 @@ public class Communication status.MyDbSize = databaseSizes.MyDbSize; status.FilteredDbSize = databaseSizes.FilteredDbSize; status.DiscardedDbSize = databaseSizes.DiscardedDbSize; - + byte[] serializedResult = JsonSerializer.SerializeToUtf8Bytes(status); rep.SendFrame(serializedResult); diff --git a/Backend/Handler/IpScanner.cs b/Backend/Handler/IpScanner.cs index c78a0bf..3652ca3 100644 --- a/Backend/Handler/IpScanner.cs +++ b/Backend/Handler/IpScanner.cs @@ -48,10 +48,11 @@ public class IpScanner { threadsAmount = 256 / threads; } - + WaitHandle[] waitHandle1; WaitHandle[] waitHandle2; + // This is jank. But it was to get it production ready. if (threads <= 64) { waitHandle1 = new WaitHandle[threads]; @@ -77,7 +78,7 @@ public class IpScanner ThreadNumber = i, Handle = handle }; - + if (i < 64) { waitHandle1[counter] = handle; @@ -131,6 +132,7 @@ public class IpScanner fourthByte = resumeNow.FourthByte; } + // Empty buffer so we use the lowest abstracted ping.Send() method. byte[] buf = []; using Ping ping = new(); @@ -181,8 +183,7 @@ public class IpScanner _ = IPAddress.TryParse(ip.ToString(), out IPAddress? address); if (address is not null) { - responseCode = IPStatus.TimedOut; //ping.Send(address, _timeout, buf, null).Status; - Thread.Sleep(1); + responseCode = ping.Send(address, _timeout, buf, null).Status; } } catch diff --git a/Backend/Handler/ThreadHandler.cs b/Backend/Handler/ThreadHandler.cs index 78c8f2c..784c270 100644 --- a/Backend/Handler/ThreadHandler.cs +++ b/Backend/Handler/ThreadHandler.cs @@ -31,7 +31,7 @@ public class ThreadHandler public void Start() { Thread scanner = new(StartScanner); - //Thread indexer = new(StartContentFilter); + Thread indexer = new(StartContentFilter); Thread database = new(StartDbHandler); Thread discarded = new(StartDiscardedDbHandler); Thread filtered = new(StartFilteredDbHandler); @@ -39,7 +39,7 @@ public class ThreadHandler Thread communication = new(StartCommunicationHandler); scanner.Start(); - //indexer.Start(); + indexer.Start(); database.Start(); discarded.Start(); filtered.Start(); @@ -47,7 +47,7 @@ public class ThreadHandler communication.Start(); scanner.Join(); - //indexer.Join(); + indexer.Join(); database.Join(); discarded.Join(); filtered.Join(); @@ -101,7 +101,7 @@ public class ThreadHandler private void StartDiscardedDbHandler() { - WaitHandle[] wait = _dbHandler.Start(3); + WaitHandle[] wait = _dbHandler.Start(4); WaitHandle.WaitAll(wait); diff --git a/Models/Handler/DbHandler.cs b/Models/Handler/DbHandler.cs index 5e23d25..282eaf8 100644 --- a/Models/Handler/DbHandler.cs +++ b/Models/Handler/DbHandler.cs @@ -15,10 +15,9 @@ public class DbHandler private readonly ConcurrentQueue _resumeQueue; private readonly string _unfilteredConnectionString; - private readonly string _discardedConnectionString; private readonly string _filteredConnectionString; private readonly string _resumeConnectionString; - private readonly string _CompressedConnectionString; + private readonly string _compressedConnectionString; private readonly List _discardedConnectionStrings = []; private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" + @@ -88,6 +87,7 @@ public class DbHandler private const string ReadFilteredIpStatement = "SELECT Ip1, Ip2, Ip3, Ip4 FROM Filtered WHERE Ip1 == @ip1 AND Ip2 == @ip1 AND Ip3 == @ip1 AND Ip4 == @ip1 ORDER BY Ip1 DESC LIMIT 1;"; private const string ReadDiscardedSeqIdsStatement = "SELECT seq FROM sqlite_sequence;"; private const string ReadAndDeleteResumeStatement = "SELECT * FROM Resume WHERE ThreadNumber == @threadNumber; DELETE FROM RESUME WHERE ThreadNumber == @threadNumber;"; + private const string ReadCompressedDbRowsStatement = "SELECT Rows FROM CompressedDatabases;"; private const string UpdateUnfilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE Unfiltered SET Filtered = 1 WHERE Id == @id;"; @@ -124,10 +124,9 @@ public class DbHandler _basePath = basePath; _unfilteredConnectionString = $"Data Source={basePath}/Models/mydb.db"; - _discardedConnectionString = $"Data Source={basePath}/Models/Discarded.db"; _filteredConnectionString = $"Data Source={basePath}/Models/Filtered.db"; _resumeConnectionString = $"Data Source={basePath}/Models/ScannerResume.db"; - _CompressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db"; + _compressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db"; } public void SetContentWaitTime(int waitTime) @@ -221,7 +220,7 @@ public class DbHandler for (int i = 0; i < threads; i++) { EventWaitHandle handle = new(false, EventResetMode.ManualReset); - + DiscardedDbHandlerSetting discardedDbHandlerSetting = new() { Handle = handle, @@ -233,7 +232,7 @@ public class DbHandler Thread f = new (DiscardedDbHandler!); f.Start(discardedDbHandlerSetting); - Thread.Sleep(1000); + Thread.Sleep(5000); } return waitHandles; @@ -257,19 +256,17 @@ public class DbHandler continue; } - if (i == Random.Shared.Next(1_000_000, 10_000_000) || i >= 10_000_000 && !_compressing) + if (i >= 50_000_000 && !_compressing) { _compressing = true; - + i = 0; - Console.WriteLine("Compressing"); - - InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexes()); + InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString)); int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models"); - CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}.gz"); + CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}"); DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId); @@ -287,7 +284,7 @@ public class DbHandler Console.WriteLine("Discarded DbHandler stopped."); } - + private void DropAndCreateDiscarded(int threadNumber) { string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db"; @@ -481,7 +478,7 @@ public class DbHandler private void InsertCompressedDatabase(int threadNumber, long rows) { - using SqliteConnection connection = new(_CompressedConnectionString); + using SqliteConnection connection = new(_compressedConnectionString); connection.Open(); using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection); @@ -586,26 +583,75 @@ public class DbHandler public long GetDiscardedIndexes() { long rowId = 0; + + SqliteConnection connection; + SqliteCommand command; + SqliteDataReader reader; for (int i = 0; i < _discardedConnectionStrings.Count; i++) { - using SqliteConnection connection = new(_discardedConnectionStrings[i]); + connection = new(_discardedConnectionStrings[i]); connection.Open(); - using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection); - using SqliteDataReader reader = command.ExecuteReader(); + command = new(ReadDiscardedSeqIdsStatement, connection); + reader = command.ExecuteReader(); if (!reader.HasRows) { - return 0; + return rowId; } while (reader.Read()) { rowId += reader.GetInt64(0); } + + connection.Close(); } + connection = new(_compressedConnectionString); + connection.Open(); + command = new(ReadCompressedDbRowsStatement, connection); + reader = command.ExecuteReader(); + + if (!reader.HasRows) + { + return rowId; + } + + while (reader.Read()) + { + rowId += reader.GetInt64(0); + } + + connection.Close(); + connection.Dispose(); + command.Dispose(); + reader.Dispose(); + + return rowId; + } + + private static long GetDiscardedIndexesForSpecificDb(string connectionString) + { + using SqliteConnection connection = new(connectionString); + connection.Open(); + + using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection); + using SqliteDataReader reader = command.ExecuteReader(); + + long rowId = 0; + + if (!reader.HasRows) + { + return rowId; + } + + while (reader.Read()) + { + rowId += reader.GetInt64(0); + } + return rowId; } @@ -734,20 +780,13 @@ public class DbHandler Thread.Sleep(5000); // Just for safety. } - SqliteConnection connection = new(_discardedConnectionString); + SqliteConnection connection = new(_filteredConnectionString); connection.Open(); SqliteCommand command = new(ReIndexDatabasesStatement, connection); _ = command.ExecuteNonQuery(); connection.Close(); - connection = new(_filteredConnectionString); - connection.Open(); - - command = new(ReIndexDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - connection = new(_unfilteredConnectionString); connection.Open(); @@ -773,20 +812,13 @@ public class DbHandler Thread.Sleep(5000); // Just for safety. } - SqliteConnection connection = new(_discardedConnectionString); + SqliteConnection connection = new(_filteredConnectionString); connection.Open(); SqliteCommand command = new(VacuumDatabasesStatement, connection); _ = command.ExecuteNonQuery(); connection.Close(); - connection = new(_filteredConnectionString); - connection.Open(); - - command = new(VacuumDatabasesStatement, connection); - _ = command.ExecuteNonQuery(); - connection.Close(); - connection = new(_unfilteredConnectionString); connection.Open(); diff --git a/RSE.sln.DotSettings.user b/RSE.sln.DotSettings.user index 60eba06..a46370d 100644 --- a/RSE.sln.DotSettings.user +++ b/RSE.sln.DotSettings.user @@ -6,8 +6,10 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded \ No newline at end of file -- 2.43.0