Reduced the memory usage, as well as hopefully fixing the memory leak. Also implemented database compression across rows, including Brotli compression and bitshifting.
This commit is contained in:
parent
b0318b7759
commit
c7fcc3297f
@ -145,12 +145,12 @@ public class IpScanner
|
||||
if (i == 192 && k == 2) continue;
|
||||
if (i == 192 && j == 88 && k == 99) continue;
|
||||
|
||||
if (_discardedQueue.Count >= 2000)
|
||||
if (_discardedQueue.Count >= 20_000)
|
||||
{
|
||||
Thread.Sleep(1000);
|
||||
}
|
||||
|
||||
if (_preFilteredQueue.Count >= 2000)
|
||||
if (_preFilteredQueue.Count >= 20_000)
|
||||
{
|
||||
Thread.Sleep(1000);
|
||||
}
|
||||
@ -170,7 +170,9 @@ public class IpScanner
|
||||
resumeObject.FourthByte = l;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
Ip ip = new()
|
||||
{
|
||||
Ip1 = i,
|
||||
@ -186,19 +188,21 @@ public class IpScanner
|
||||
// Sometimes, if the pinger gets a Destination Unreachable Communication administratively prohibited response, the pinger will throw an exception.
|
||||
// https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol?useskin=vector#Control_messages
|
||||
//_ = IPAddress.TryParse(ip.ToString(), out IPAddress? address);
|
||||
|
||||
if (i % 2 == 0)
|
||||
|
||||
responseCode = CustomPing.SendIcmpEchoRequestOverRawSocket(Parse(ip.ToString()), _timeout);
|
||||
|
||||
/*if (l % 2 == 0)
|
||||
{
|
||||
responseCode = IPStatus.Success;
|
||||
}
|
||||
else
|
||||
{
|
||||
responseCode = IPStatus.TimedOut;
|
||||
}
|
||||
}*/
|
||||
|
||||
//CustomPing.SendIcmpEchoRequestOverRawSocket(Parse(ip.ToString()), _timeout);
|
||||
Thread.Sleep(16);
|
||||
//responseCode = IPStatus.TimedOut;
|
||||
|
||||
//Thread.Sleep(0);
|
||||
}
|
||||
catch
|
||||
{
|
||||
@ -233,10 +237,10 @@ public class IpScanner
|
||||
resumeObject.FirstByte = i;
|
||||
break;
|
||||
}
|
||||
|
||||
Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) is at index ({i}) out of ({scanSettings.End}). Remaining ({scanSettings.End - i})");
|
||||
}
|
||||
|
||||
Console.WriteLine($"Thread ({scanSettings.ThreadNumber}) stopped.");
|
||||
|
||||
if (_stop)
|
||||
{
|
||||
resumeObject.Paused = true;
|
||||
|
@ -15,8 +15,8 @@ public class ThreadHandler
|
||||
private bool _contentFilterStopped;
|
||||
private bool _ipFilterStopped;
|
||||
|
||||
private bool _stage1;
|
||||
private bool _stage2 = true;
|
||||
private bool _stage1 = true;
|
||||
private bool _stage2;
|
||||
private bool _stage3;
|
||||
|
||||
ConcurrentQueue<Filtered> filteredQueue = new();
|
||||
@ -54,10 +54,12 @@ public class ThreadHandler
|
||||
prefilterDb.Start(); // de-queues from preFilteredQueue
|
||||
scanner.Start(); // en-queues to discardedQueue and preFilteredQueue
|
||||
resume.Start(); // de-queues from resumeQueue
|
||||
|
||||
scanner.Join();
|
||||
Stop();
|
||||
|
||||
discarded.Join();
|
||||
prefilterDb.Join();
|
||||
scanner.Join();
|
||||
resume.Join();
|
||||
}
|
||||
|
||||
@ -205,6 +207,7 @@ public class ThreadHandler
|
||||
|
||||
Console.WriteLine("Stopping Super Extra...");
|
||||
_dbHandler.Stop();
|
||||
|
||||
Console.WriteLine("Stopped.");
|
||||
}
|
||||
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,5 +1,7 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Models.Helper;
|
||||
using Models.Model.Backend;
|
||||
@ -75,8 +77,8 @@ public class DbHandler
|
||||
|
||||
private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||
" INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode)" +
|
||||
" VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)";
|
||||
" INSERT INTO Discarded (PackedData)" +
|
||||
" VALUES (@packedData)";
|
||||
|
||||
private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
|
||||
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
|
||||
@ -263,7 +265,7 @@ public class DbHandler
|
||||
Thread f = new (DiscardedDbHandler!);
|
||||
f.Start(discardedDbHandlerSetting);
|
||||
|
||||
Thread.Sleep(5000);
|
||||
Thread.Sleep(150);
|
||||
}
|
||||
|
||||
return waitHandles;
|
||||
@ -278,7 +280,16 @@ public class DbHandler
|
||||
|
||||
int i = 0;
|
||||
|
||||
while (!_stop)
|
||||
SqliteConnection connection = new(connectionString);
|
||||
connection.Open();
|
||||
|
||||
SqliteCommand command = new(InsertIntoDiscarded, connection);
|
||||
|
||||
StringBuilder stringBuilder = new();
|
||||
|
||||
bool running = true;
|
||||
|
||||
while (!_stop && running)
|
||||
{
|
||||
if (_discardedQueue.IsEmpty || _pause)
|
||||
{
|
||||
@ -286,31 +297,46 @@ public class DbHandler
|
||||
_paused = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (i >= 5_000_000 && !_compressing)
|
||||
|
||||
if (_stop && _discardedQueue.IsEmpty)
|
||||
{
|
||||
_compressing = true;
|
||||
|
||||
i = 0;
|
||||
|
||||
InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString));
|
||||
|
||||
int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models");
|
||||
|
||||
CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}");
|
||||
|
||||
DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId);
|
||||
|
||||
_compressing = false;
|
||||
running = false;
|
||||
}
|
||||
|
||||
_discardedQueue.TryDequeue(out Discarded queueItem);
|
||||
if (i == 2048)
|
||||
{
|
||||
command.Parameters.AddWithValue("@packedData", CompressionHelper.CompressString(stringBuilder.ToString()));
|
||||
_ = command.ExecuteNonQuery();
|
||||
|
||||
// Re-use the command object.
|
||||
command.Parameters.Clear();
|
||||
|
||||
stringBuilder.Clear();
|
||||
|
||||
i = 0;
|
||||
}
|
||||
|
||||
InsertDiscarded(queueItem, connectionString);
|
||||
if (_discardedQueue.TryDequeue(out Discarded queueItem))
|
||||
{
|
||||
stringBuilder.Append(queueItem.Ip.PackIp());
|
||||
stringBuilder.Append(':');
|
||||
stringBuilder.Append(queueItem.ResponseCode);
|
||||
stringBuilder.Append(',');
|
||||
}
|
||||
|
||||
i++;
|
||||
}
|
||||
|
||||
if (stringBuilder.Length != 0)
|
||||
{
|
||||
command.Parameters.AddWithValue("@packedData", CompressionHelper.CompressString(stringBuilder.ToString()));
|
||||
_ = command.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
connection.Close();
|
||||
connection.Dispose();
|
||||
command.Dispose();
|
||||
|
||||
discardedDbHandlerSetting.Handle!.Set();
|
||||
|
||||
Console.WriteLine("Discarded DbHandler stopped.");
|
||||
@ -359,23 +385,6 @@ public class DbHandler
|
||||
connection.Close();
|
||||
}
|
||||
|
||||
private static void InsertDiscarded(Discarded discarded, string dbConnectionString)
|
||||
{
|
||||
using SqliteConnection connection = new(dbConnectionString);
|
||||
connection.Open();
|
||||
|
||||
using SqliteCommand command = new(InsertIntoDiscarded, connection);
|
||||
|
||||
command.Parameters.AddWithValue("@ip1", discarded.Ip.Ip1);
|
||||
command.Parameters.AddWithValue("@ip2", discarded.Ip.Ip2);
|
||||
command.Parameters.AddWithValue("@ip3", discarded.Ip.Ip3);
|
||||
command.Parameters.AddWithValue("@ip4", discarded.Ip.Ip4);
|
||||
command.Parameters.AddWithValue("@responseCode", discarded.ResponseCode);
|
||||
_ = command.ExecuteNonQuery();
|
||||
|
||||
connection.Close();
|
||||
}
|
||||
|
||||
private void InsertFiltered(Filtered filtered)
|
||||
{
|
||||
using SqliteConnection connection = new(_filteredConnectionString);
|
||||
@ -544,6 +553,7 @@ public class DbHandler
|
||||
command.Parameters.AddWithValue("@filtered", 0);
|
||||
|
||||
_ = command.ExecuteNonQuery();
|
||||
|
||||
connection.Close();
|
||||
}
|
||||
|
||||
@ -726,36 +736,6 @@ public class DbHandler
|
||||
return true;
|
||||
}
|
||||
|
||||
public List<SearchResult?> GetSearchResults()
|
||||
{
|
||||
lock (_readFilteredLock)
|
||||
{
|
||||
using SqliteConnection connection = new(_filteredConnectionString);
|
||||
connection.Open();
|
||||
|
||||
using SqliteCommand command = new(ReadFilteredStatement, connection);
|
||||
using SqliteDataReader reader = command.ExecuteReader();
|
||||
|
||||
if (!reader.HasRows)
|
||||
{
|
||||
return [];
|
||||
}
|
||||
|
||||
List<SearchResult?> results = [];
|
||||
|
||||
while (reader.Read())
|
||||
{
|
||||
SearchResult result = new();
|
||||
result.Title = reader.GetString(0);
|
||||
result.Url = reader.GetString(1);
|
||||
|
||||
results.Add(result);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
public ScannerResumeObject? GetResumeObject(int threadNumber)
|
||||
{
|
||||
lock (_readAndDeleteResumeLock)
|
||||
@ -791,76 +771,13 @@ public class DbHandler
|
||||
return resumeObject;
|
||||
}
|
||||
}
|
||||
|
||||
public void ReIndex()
|
||||
{
|
||||
_pause = true;
|
||||
Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open.
|
||||
|
||||
if (!_paused)
|
||||
{
|
||||
Thread.Sleep(5000); // Just for safety.
|
||||
}
|
||||
|
||||
SqliteConnection connection = new(_filteredConnectionString);
|
||||
connection.Open();
|
||||
|
||||
SqliteCommand command = new(ReIndexDatabasesStatement, connection);
|
||||
_ = command.ExecuteNonQuery();
|
||||
connection.Close();
|
||||
|
||||
connection = new(_unfilteredConnectionString);
|
||||
connection.Open();
|
||||
|
||||
command = new(ReIndexDatabasesStatement, connection);
|
||||
_ = command.ExecuteNonQuery();
|
||||
connection.Close();
|
||||
|
||||
connection.Dispose();
|
||||
command.Dispose();
|
||||
|
||||
_pause = false;
|
||||
_paused = false;
|
||||
}
|
||||
|
||||
public void Vacuum()
|
||||
{
|
||||
_pause = true;
|
||||
|
||||
Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open.
|
||||
|
||||
if (!_paused)
|
||||
{
|
||||
Thread.Sleep(5000); // Just for safety.
|
||||
}
|
||||
|
||||
SqliteConnection connection = new(_filteredConnectionString);
|
||||
connection.Open();
|
||||
|
||||
SqliteCommand command = new(VacuumDatabasesStatement, connection);
|
||||
_ = command.ExecuteNonQuery();
|
||||
connection.Close();
|
||||
|
||||
connection = new(_unfilteredConnectionString);
|
||||
connection.Open();
|
||||
|
||||
command = new(VacuumDatabasesStatement, connection);
|
||||
_ = command.ExecuteNonQuery();
|
||||
connection.Close();
|
||||
|
||||
connection.Dispose();
|
||||
command.Dispose();
|
||||
|
||||
_pause = false;
|
||||
_paused = false;
|
||||
}
|
||||
|
||||
private (string, string) CreateDiscardedDb(int threadNumber)
|
||||
{
|
||||
string absolutePath = $"{_basePath}/Models/Discarded{threadNumber}.db";
|
||||
string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db";
|
||||
|
||||
const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))";
|
||||
const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, PackedData TEXT NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))";
|
||||
|
||||
_discardedConnectionStrings.Add(databaseName);
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
using System.IO.Compression;
|
||||
using System.Text;
|
||||
|
||||
namespace Models.Helper;
|
||||
|
||||
@ -19,4 +20,31 @@ public static class CompressionHelper
|
||||
using GZipStream decompressor = new(compressedFileStream, CompressionMode.Decompress);
|
||||
decompressor.CopyTo(decompressedFileStream);
|
||||
}
|
||||
|
||||
public static string CompressString(string text)
|
||||
{
|
||||
byte[] byteArray = Encoding.UTF8.GetBytes(text);
|
||||
|
||||
using MemoryStream memoryStream = new();
|
||||
|
||||
using (BrotliStream compressionStream = new(memoryStream, CompressionLevel.SmallestSize))
|
||||
{
|
||||
compressionStream.Write(byteArray, 0, byteArray.Length);
|
||||
}
|
||||
|
||||
return Convert.ToBase64String(memoryStream.ToArray());
|
||||
}
|
||||
|
||||
public static string DecompressString(string compressedText)
|
||||
{
|
||||
byte[] byteArray = Convert.FromBase64String(compressedText);
|
||||
|
||||
using MemoryStream memoryStream = new(byteArray);
|
||||
|
||||
using BrotliStream decompressionStream = new(memoryStream, CompressionMode.Decompress);
|
||||
|
||||
using StreamReader reader = new(decompressionStream);
|
||||
|
||||
return reader.ReadToEnd();
|
||||
}
|
||||
}
|
@ -14,4 +14,9 @@ public struct Ip
|
||||
{
|
||||
return $"{Ip1}.{Ip2}.{Ip3}.{Ip4}";
|
||||
}
|
||||
|
||||
public uint PackIp()
|
||||
{
|
||||
return (uint)(Ip1 << 24) | (uint)(Ip2 << 16) | (uint)(Ip3 << 8) | (uint)Ip4;
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AIPAddressParser_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F5a18f227dadd72bd8268cdb333cd70aa19d8663c3610d541cda4fd0199acbf4_003FIPAddressParser_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AIPAddress_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F7aa62fd0fbc144a6818ff7b2fd2626dc34800_003F48_003Fb056cfcf_003FIPAddress_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AIPAddress_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003Fdcb058a821641cccb63e4a61914bd75ed5d336dda19353b41994ef1159c85bec_003FIPAddress_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AIPStatus_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F6079bfa8f3c0474bb21ade75ce5d010318000_003F15_003F57afed32_003FIPStatus_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003APingReply_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F76f05f4da452fadb1ab24cdb83dccb74b6e6484519e28acc6ce2c02c7aabac24_003FPingReply_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003APing_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F0a3c9a6c6f0343119978ec009640fbbb18000_003F44_003F939f4a86_003FPing_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003APing_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003F_002E_002E_003F_002Econfig_003FJetBrains_003FRider2024_002E2_003Fresharper_002Dhost_003FSourcesCache_003F3080b18e3637ea741b5b65abd6aee06e41494a82a58b3e2ed87d4ddb5cc62_003FPing_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
|
||||
|
Loading…
Reference in New Issue
Block a user