Implement database compression

This commit is contained in:
Rasmus Rasmussen 2024-12-30 18:36:04 +01:00
parent 0cfbcea252
commit a6804be9f1
5 changed files with 79 additions and 43 deletions

View File

@ -87,6 +87,7 @@ public class Communication
}
else
{
// This is a workaround for the frontend not understanding a 0f as an actual float, so we use a very small float.
status.PercentageOfIpv4Scanned = 0.0000000001f;
}
@ -96,7 +97,7 @@ public class Communication
status.MyDbSize = databaseSizes.MyDbSize;
status.FilteredDbSize = databaseSizes.FilteredDbSize;
status.DiscardedDbSize = databaseSizes.DiscardedDbSize;
byte[] serializedResult = JsonSerializer.SerializeToUtf8Bytes(status);
rep.SendFrame(serializedResult);

View File

@ -48,10 +48,11 @@ public class IpScanner
{
threadsAmount = 256 / threads;
}
WaitHandle[] waitHandle1;
WaitHandle[] waitHandle2;
// This is jank. But it was to get it production ready.
if (threads <= 64)
{
waitHandle1 = new WaitHandle[threads];
@ -77,7 +78,7 @@ public class IpScanner
ThreadNumber = i,
Handle = handle
};
if (i < 64)
{
waitHandle1[counter] = handle;
@ -131,6 +132,7 @@ public class IpScanner
fourthByte = resumeNow.FourthByte;
}
// Empty buffer so we use the lowest abstracted ping.Send() method.
byte[] buf = [];
using Ping ping = new();
@ -181,8 +183,7 @@ public class IpScanner
_ = IPAddress.TryParse(ip.ToString(), out IPAddress? address);
if (address is not null)
{
responseCode = IPStatus.TimedOut; //ping.Send(address, _timeout, buf, null).Status;
Thread.Sleep(1);
responseCode = ping.Send(address, _timeout, buf, null).Status;
}
}
catch

View File

@ -31,7 +31,7 @@ public class ThreadHandler
public void Start()
{
Thread scanner = new(StartScanner);
//Thread indexer = new(StartContentFilter);
Thread indexer = new(StartContentFilter);
Thread database = new(StartDbHandler);
Thread discarded = new(StartDiscardedDbHandler);
Thread filtered = new(StartFilteredDbHandler);
@ -39,7 +39,7 @@ public class ThreadHandler
Thread communication = new(StartCommunicationHandler);
scanner.Start();
//indexer.Start();
indexer.Start();
database.Start();
discarded.Start();
filtered.Start();
@ -47,7 +47,7 @@ public class ThreadHandler
communication.Start();
scanner.Join();
//indexer.Join();
indexer.Join();
database.Join();
discarded.Join();
filtered.Join();
@ -101,7 +101,7 @@ public class ThreadHandler
private void StartDiscardedDbHandler()
{
WaitHandle[] wait = _dbHandler.Start(3);
WaitHandle[] wait = _dbHandler.Start(4);
WaitHandle.WaitAll(wait);

View File

@ -15,10 +15,9 @@ public class DbHandler
private readonly ConcurrentQueue<ScannerResumeObject> _resumeQueue;
private readonly string _unfilteredConnectionString;
private readonly string _discardedConnectionString;
private readonly string _filteredConnectionString;
private readonly string _resumeConnectionString;
private readonly string _CompressedConnectionString;
private readonly string _compressedConnectionString;
private readonly List<string> _discardedConnectionStrings = [];
private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
@ -88,6 +87,7 @@ public class DbHandler
private const string ReadFilteredIpStatement = "SELECT Ip1, Ip2, Ip3, Ip4 FROM Filtered WHERE Ip1 == @ip1 AND Ip2 == @ip1 AND Ip3 == @ip1 AND Ip4 == @ip1 ORDER BY Ip1 DESC LIMIT 1;";
private const string ReadDiscardedSeqIdsStatement = "SELECT seq FROM sqlite_sequence;";
private const string ReadAndDeleteResumeStatement = "SELECT * FROM Resume WHERE ThreadNumber == @threadNumber; DELETE FROM RESUME WHERE ThreadNumber == @threadNumber;";
private const string ReadCompressedDbRowsStatement = "SELECT Rows FROM CompressedDatabases;";
private const string UpdateUnfilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE Unfiltered SET Filtered = 1 WHERE Id == @id;";
@ -124,10 +124,9 @@ public class DbHandler
_basePath = basePath;
_unfilteredConnectionString = $"Data Source={basePath}/Models/mydb.db";
_discardedConnectionString = $"Data Source={basePath}/Models/Discarded.db";
_filteredConnectionString = $"Data Source={basePath}/Models/Filtered.db";
_resumeConnectionString = $"Data Source={basePath}/Models/ScannerResume.db";
_CompressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db";
_compressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db";
}
public void SetContentWaitTime(int waitTime)
@ -221,7 +220,7 @@ public class DbHandler
for (int i = 0; i < threads; i++)
{
EventWaitHandle handle = new(false, EventResetMode.ManualReset);
DiscardedDbHandlerSetting discardedDbHandlerSetting = new()
{
Handle = handle,
@ -233,7 +232,7 @@ public class DbHandler
Thread f = new (DiscardedDbHandler!);
f.Start(discardedDbHandlerSetting);
Thread.Sleep(1000);
Thread.Sleep(5000);
}
return waitHandles;
@ -257,19 +256,17 @@ public class DbHandler
continue;
}
if (i == Random.Shared.Next(1_000_000, 10_000_000) || i >= 10_000_000 && !_compressing)
if (i >= 50_000_000 && !_compressing)
{
_compressing = true;
i = 0;
Console.WriteLine("Compressing");
InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexes());
InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString));
int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models");
CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}.gz");
CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}");
DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId);
@ -287,7 +284,7 @@ public class DbHandler
Console.WriteLine("Discarded DbHandler stopped.");
}
private void DropAndCreateDiscarded(int threadNumber)
{
string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db";
@ -481,7 +478,7 @@ public class DbHandler
private void InsertCompressedDatabase(int threadNumber, long rows)
{
using SqliteConnection connection = new(_CompressedConnectionString);
using SqliteConnection connection = new(_compressedConnectionString);
connection.Open();
using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection);
@ -586,26 +583,75 @@ public class DbHandler
public long GetDiscardedIndexes()
{
long rowId = 0;
SqliteConnection connection;
SqliteCommand command;
SqliteDataReader reader;
for (int i = 0; i < _discardedConnectionStrings.Count; i++)
{
using SqliteConnection connection = new(_discardedConnectionStrings[i]);
connection = new(_discardedConnectionStrings[i]);
connection.Open();
using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
command = new(ReadDiscardedSeqIdsStatement, connection);
reader = command.ExecuteReader();
if (!reader.HasRows)
{
return 0;
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
connection.Close();
}
connection = new(_compressedConnectionString);
connection.Open();
command = new(ReadCompressedDbRowsStatement, connection);
reader = command.ExecuteReader();
if (!reader.HasRows)
{
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
connection.Close();
connection.Dispose();
command.Dispose();
reader.Dispose();
return rowId;
}
private static long GetDiscardedIndexesForSpecificDb(string connectionString)
{
using SqliteConnection connection = new(connectionString);
connection.Open();
using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
long rowId = 0;
if (!reader.HasRows)
{
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
return rowId;
}
@ -734,20 +780,13 @@ public class DbHandler
Thread.Sleep(5000); // Just for safety.
}
SqliteConnection connection = new(_discardedConnectionString);
SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
SqliteCommand command = new(ReIndexDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_filteredConnectionString);
connection.Open();
command = new(ReIndexDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_unfilteredConnectionString);
connection.Open();
@ -773,20 +812,13 @@ public class DbHandler
Thread.Sleep(5000); // Just for safety.
}
SqliteConnection connection = new(_discardedConnectionString);
SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
SqliteCommand command = new(VacuumDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_filteredConnectionString);
connection.Open();
command = new(VacuumDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_unfilteredConnectionString);
connection.Open();

View File

@ -6,8 +6,10 @@
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ADirectoryInfo_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003Fe4ec446cfe0489bc3ef68a45c6766d183e999ebdc657e94fb1ad059de2bb9_003FDirectoryInfo_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AFileSystemEnumerator_002EUnix_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F233863917bb42f133182fb4926e94ef8139c6f704da0c4574a8de3209f4761_003FFileSystemEnumerator_002EUnix_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AHttpResponseMessage_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F85e97f467d698c9e98eae9e3a1b39d58541173e57992d8f7111eabdd3db3526_003FHttpResponseMessage_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003APing_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F3080b18e3637ea741b5b65abd6aee06e41494a82a58b3e2ed87d4ddb5cc62_003FPing_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ARateLimitRule_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F8fbca8b1bca27d45830c443b2c773d979015ea216430366f285514a39fc0b9_003FRateLimitRule_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ASqliteException_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F154220569126135ad5d7314bf2bc694d3cf7c95840d481d44f0336f4f1f8e9c_003FSqliteException_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ASqliteParameterCollection_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F56cf675b4777645c714ae85e12bde2163da8ec62d2a23f8b35ef357547a9_003FSqliteParameterCollection_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AStartupExtensions_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F3ce5d581dd9cc0e4cdfd914e797ba2da05e894767d76b86f0515ef5226bac_003FStartupExtensions_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AString_002ESearching_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F49ee52518952e16b89adee3d6c9346ae6c74be268730f0497eb14b34b49d56c_003FString_002ESearching_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AThread_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F693e634d7742afaf486acd69d84fe2a9e1ee1b11ba84f29cd1d67668d20dd59_003FThread_002Ecs/@EntryIndexedValue">ForceIncluded</s:String></wpf:ResourceDictionary>