RSE/Models/Handler/DbHandler.cs

854 lines
35 KiB
C#

using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.Data.Sqlite;
using Models.Helper;
using Models.Model.Backend;
using Models.Model.External;
namespace Models.Handler;
public class DbHandler
{
private readonly ConcurrentQueue<Filtered> _filteredQueue;
private readonly ConcurrentQueue<UnfilteredQueueItem> _unfilteredQueue;
private readonly ConcurrentQueue<Discarded> _discardedQueue;
private readonly ConcurrentQueue<ScannerResumeObject> _resumeQueue;
private readonly string _unfilteredConnectionString;
private readonly string _filteredConnectionString;
private readonly string _resumeConnectionString;
private readonly string _compressedConnectionString;
private readonly List<string> _discardedConnectionStrings = [];
private const string InsertStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
" INSERT INTO Unfiltered (Ip1, Ip2, Ip3, Ip4, Port1, Port2, Filtered)" +
" VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, @filtered)";
private const string InsertIntoFiltered = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = on;" +
" INSERT INTO Filtered (Ip1, Ip2, Ip3, Ip4, Port1, Port2," +
" Url1, Url2, ServerType1, ServerType2," +
" RobotsTXT1, RobotsTXT2, HttpVersion1, HttpVersion2, CertificateIssuerCountry," +
" CertificateOrganizationName, IpV6, TlsVersion, CipherSuite, KeyExchangeAlgorithm," +
" PublicKeyType1, PublicKeyType2, PublicKeyType3, AcceptEncoding1, AcceptEncoding2," +
" ALPN, Connection1, Connection2) VALUES (@ip1, @ip2, @ip3, @ip4, @port1, @port2, " +
" @url1, @url2, " +
" (SELECT ServerId FROM ServerType WHERE Type = @serverType1), " +
" (SELECT ServerId FROM ServerType WHERE Type = @serverType2), " +
" @robotsTXT1, @robotsTXT2," +
" (SELECT HttpId FROM HttpVersion WHERE Version = @httpVersion1)," +
" (SELECT HttpId FROM HttpVersion WHERE Version = @httpVersion2)," +
" (SELECT CertificateIssuerId FROM CertificateIssuerCountry WHERE Country = @certificateIssuerCountry)," +
" (SELECT CertificateOrganizationId FROM CertificateOrganizationName WHERE Name = @certificateOrganizationName), " +
" @ipV6, " +
" (SELECT TlsId FROM TlsVersion WHERE Version = @tlsVersion)," +
" (SELECT CipherId FROM CipherSuite WHERE Suite = @cipherSuite)," +
" (SELECT KeyExchangeId FROM KeyExchangeAlgorithm WHERE Algorithm = @keyExchangeAlgorithm)," +
" (SELECT PublicKeyId FROM PublicKeyType WHERE Type = @publicKeyType1)," +
" (SELECT PublicKeyId FROM PublicKeyType WHERE Type = @publicKeyType2)," +
" (SELECT PublicKeyId FROM PublicKeyType WHERE Type = @publicKeyType3)," +
" (SELECT AcceptId FROM AcceptEncoding WHERE Encoding = @acceptEncoding1)," +
" (SELECT AcceptId FROM AcceptEncoding WHERE Encoding = @acceptEncoding2)," +
" (SELECT ALPNId FROM ALPN WHERE ALPNValue = @aLPN)," +
" (SELECT ConnectionId FROM Connection WHERE ConnectionValue = @connection1)," +
" (SELECT ConnectionId FROM Connection WHERE ConnectionValue = @connection2))";
private const string InsertIntoFilteredServerType = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO ServerType (Type) VALUES (@type)";
private const string InsertIntoFilteredHttpVersion = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO HttpVersion (Version) VALUES (@version)";
private const string InsertIntoFilteredCertificateIssuerCountry = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO CertificateIssuerCountry (Country) VALUES (@country)";
private const string InsertIntoFilteredCertificateOrganizationName = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO CertificateOrganizationName (Name) VALUES (@name)";
private const string InsertIntoFilteredTlsVersion = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO TlsVersion (Version) VALUES (@version)";
private const string InsertIntoFilteredCipherSuite = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO CipherSuite (Suite) VALUES (@suite)";
private const string InsertIntoFilteredKeyExchangeAlgorithm = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO KeyExchangeAlgorithm (Algorithm) VALUES (@algorithm)";
private const string InsertIntoFilteredPublicKeyType = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO PublicKeyType (Type) VALUES (@type)";
private const string InsertIntoFilteredAcceptEncoding = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO AcceptEncoding (Encoding) VALUES (@encoding)";
private const string InsertIntoFilteredALPN = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO ALPN (ALPNValue) VALUES (@alpnValue)";
private const string InsertIntoFilteredConnection = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; INSERT OR IGNORE INTO Connection (ConnectionValue) VALUES (@connectionValue)";
private const string InsertIntoDiscarded = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
" INSERT INTO Discarded (Ip1, Ip2, Ip3, Ip4, ResponseCode)" +
" VALUES (@ip1, @ip2, @ip3, @ip4, @responseCode)";
private const string InsertIntoResume = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
" INSERT INTO Resume (ThreadNumber, StartRange, EndRange, FirstByte, SecondByte, ThirdByte, FourthByte)" +
" VALUES (@threadNumber, @startRange, @endRange, @firstByte, @secondByte, @thirdByte, @fourthByte);";
private const string InsertIntoCompressedDbConnectionString = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY;" +
" PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off;" +
" INSERT INTO CompressedDatabases (DbNumber, Rows) VALUES (@dbNumber, @rows)";
private const string ReadUnfilteredStatement = "SELECT * FROM Unfiltered WHERE Id = @id;";
private const string ReadUnfilteredIdsStatement = "SELECT Id FROM Unfiltered WHERE Filtered == 0;";
private const string ReadFilteredStatement = "SELECT Title2, Url2 FROM Filtered WHERE (Url2 NOT NULL AND Url2 != '') AND (Title2 NOT NULL AND Title2 != '') ORDER BY Url2 DESC;";
private const string ReadFilteredIdsStatement = "SELECT Id FROM Filtered WHERE Id != 0 ORDER BY Id DESC LIMIT 1;";
private const string ReadFilteredIpStatement = "SELECT Ip1, Ip2, Ip3, Ip4 FROM Filtered WHERE Ip1 == @ip1 AND Ip2 == @ip1 AND Ip3 == @ip1 AND Ip4 == @ip1 ORDER BY Ip1 DESC LIMIT 1;";
private const string ReadDiscardedSeqIdsStatement = "SELECT seq FROM sqlite_sequence;";
private const string ReadAndDeleteResumeStatement = "SELECT * FROM Resume WHERE ThreadNumber == @threadNumber; DELETE FROM RESUME WHERE ThreadNumber == @threadNumber;";
private const string ReadCompressedDbRowsStatement = "SELECT Rows FROM CompressedDatabases;";
private const string UpdateUnfilteredStatement = "PRAGMA synchronous = OFF; PRAGMA temp_store = MEMORY; PRAGMA journal_mode = MEMORY; PRAGMA foreign_keys = off; UPDATE Unfiltered SET Filtered = 1 WHERE Id == @id;";
private const string ReIndexDatabasesStatement = "REINDEX;";
private const string VacuumDatabasesStatement = "VACUUM;";
private readonly object _readFilteredLock = new();
private readonly object _readAndDeleteResumeLock = new();
private bool _stop;
private bool _pause;
private bool _paused;
private bool _compressing;
private int _contentWaitTime;
private int _discardedWaitTime;
private readonly string _basePath;
public DbHandler(ConcurrentQueue<Filtered> filteredQueue,
ConcurrentQueue<Discarded> discardedQueue,
ConcurrentQueue<UnfilteredQueueItem> unfilteredQueue,
ConcurrentQueue<ScannerResumeObject> resumeQueue, string basePath)
{
_filteredQueue = filteredQueue;
_discardedQueue = discardedQueue;
_unfilteredQueue = unfilteredQueue;
_resumeQueue = resumeQueue;
SetContentWaitTime(100);
SetDiscardedWaitTime(10);
_basePath = basePath;
_unfilteredConnectionString = $"Data Source={basePath}/Models/mydb.db";
_filteredConnectionString = $"Data Source={basePath}/Models/Filtered.db";
_resumeConnectionString = $"Data Source={basePath}/Models/ScannerResume.db";
_compressedConnectionString = $"Data Source={basePath}/Models/CompressedDatabases.db";
}
public void SetContentWaitTime(int waitTime)
{
_contentWaitTime = waitTime;
}
public void SetDiscardedWaitTime(int waitTime)
{
_discardedWaitTime = waitTime;
}
public void UnfilteredDbHandler()
{
Console.WriteLine("Unfiltered DbHandler started");
while (!_stop)
{
if (_unfilteredQueue.IsEmpty || _pause)
{
Thread.Sleep(_contentWaitTime);
_paused = true;
continue;
}
_unfilteredQueue.TryDequeue(out UnfilteredQueueItem queueItem);
if (queueItem.Operations == Operations.Insert)
{
InsertUnfiltered(queueItem.Unfiltered);
}
else if (queueItem.Operations == Operations.Update)
{
UpdateUnfiltered(queueItem.Unfiltered);
}
}
Console.WriteLine("Unfiltered DbHandler stopped.");
}
public void FilteredDbHandler()
{
Console.WriteLine("Filtered DB handler started");
while (!_stop)
{
if (_filteredQueue.IsEmpty || _pause)
{
Thread.Sleep(_contentWaitTime);
_paused = true;
continue;
}
_filteredQueue.TryDequeue(out Filtered? queueItem);
InsertFiltered(queueItem!);
}
Console.WriteLine("Filtered DbHandler stopped.");
}
public void ResumeDbHandler()
{
Console.WriteLine("Resume DB handler started");
while (!_stop)
{
if (_resumeQueue.IsEmpty || _pause)
{
Thread.Sleep(_contentWaitTime);
_paused = true;
continue;
}
_resumeQueue.TryDequeue(out ScannerResumeObject? queueItem);
if (queueItem is not null)
{
InsertResumeObject(queueItem);
}
}
Console.WriteLine("Resume DbHandler stopped.");
}
public WaitHandle[] Start(int threads)
{
WaitHandle[] waitHandles = new WaitHandle[threads];
for (int i = 0; i < threads; i++)
{
EventWaitHandle handle = new(false, EventResetMode.ManualReset);
DiscardedDbHandlerSetting discardedDbHandlerSetting = new()
{
Handle = handle,
ThreadId = i
};
waitHandles[i] = handle;
Thread f = new (DiscardedDbHandler!);
f.Start(discardedDbHandlerSetting);
Thread.Sleep(5000);
}
return waitHandles;
}
private void DiscardedDbHandler(object obj)
{
DiscardedDbHandlerSetting discardedDbHandlerSetting = (DiscardedDbHandlerSetting)obj;
Console.WriteLine($"Discarded DbHandler started with thread: ({discardedDbHandlerSetting.ThreadId})");
(string absolutePath, string connectionString) = CreateDiscardedDb(discardedDbHandlerSetting.ThreadId);
int i = 0;
while (!_stop)
{
if (_discardedQueue.IsEmpty || _pause)
{
Thread.Sleep(_discardedWaitTime);
_paused = true;
continue;
}
if (i >= 500_000 && !_compressing)
{
_compressing = true;
i = 0;
InsertCompressedDatabase(discardedDbHandlerSetting.ThreadId, GetDiscardedIndexesForSpecificDb(connectionString));
int compressedDatabases = GetDatabasesHelper.GetTotalCompressedDatabases($"{_basePath}/Models");
CompressionHelper.CompressFile(absolutePath, $"{absolutePath}_{compressedDatabases}");
DropAndCreateDiscarded(discardedDbHandlerSetting.ThreadId);
_compressing = false;
}
_discardedQueue.TryDequeue(out Discarded queueItem);
InsertDiscarded(queueItem, connectionString);
i++;
}
discardedDbHandlerSetting.Handle!.Set();
Console.WriteLine("Discarded DbHandler stopped.");
}
private void DropAndCreateDiscarded(int threadNumber)
{
string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db";
const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))";
const string dropStatement = "DROP TABLE Discarded;";
const string vacuum = "VACUUM;";
using SqliteConnection connection = new(databaseName);
connection.Open();
SqliteCommand command = new(dropStatement, connection);
command.ExecuteNonQuery();
command = new(vacuum, connection);
command.ExecuteNonQuery();
command = new(createStatement, connection);
command.ExecuteNonQuery();
command.Dispose();
connection.Close();
}
private void InsertUnfiltered(Unfiltered unfiltered)
{
using SqliteConnection connection = new(_unfilteredConnectionString);
connection.Open();
using SqliteCommand command = new(InsertStatement, connection);
command.Parameters.AddWithValue("@ip1", unfiltered.Ip.Ip1);
command.Parameters.AddWithValue("@ip2", unfiltered.Ip.Ip2);
command.Parameters.AddWithValue("@ip3", unfiltered.Ip.Ip3);
command.Parameters.AddWithValue("@ip4", unfiltered.Ip.Ip4);
command.Parameters.AddWithValue("@port1", unfiltered.Port1);
command.Parameters.AddWithValue("@port2", unfiltered.Port2);
command.Parameters.AddWithValue("@filtered", unfiltered.Filtered);
_ = command.ExecuteNonQuery();
connection.Close();
}
private static void InsertDiscarded(Discarded discarded, string dbConnectionString)
{
using SqliteConnection connection = new(dbConnectionString);
connection.Open();
using SqliteCommand command = new(InsertIntoDiscarded, connection);
command.Parameters.AddWithValue("@ip1", discarded.Ip.Ip1);
command.Parameters.AddWithValue("@ip2", discarded.Ip.Ip2);
command.Parameters.AddWithValue("@ip3", discarded.Ip.Ip3);
command.Parameters.AddWithValue("@ip4", discarded.Ip.Ip4);
command.Parameters.AddWithValue("@responseCode", discarded.ResponseCode);
_ = command.ExecuteNonQuery();
connection.Close();
}
private void InsertFiltered(Filtered filtered)
{
using SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
SqliteCommand command = new(InsertIntoFilteredServerType, connection);
command.Parameters.AddWithValue("@type", filtered.ServerType1);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredServerType, connection);
command.Parameters.AddWithValue("@type", filtered.ServerType2);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredHttpVersion, connection);
command.Parameters.AddWithValue("@version", filtered.HttpVersion1);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredHttpVersion, connection);
command.Parameters.AddWithValue("@version", filtered.HttpVersion2);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredCertificateIssuerCountry, connection);
command.Parameters.AddWithValue("@country", filtered.CertificateIssuerCountry);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredCertificateOrganizationName, connection);
command.Parameters.AddWithValue("@name", filtered.CertificateOrganizationName);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredTlsVersion, connection);
command.Parameters.AddWithValue("@version", filtered.TlsVersion);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredCipherSuite, connection);
command.Parameters.AddWithValue("@suite", filtered.CipherSuite);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredKeyExchangeAlgorithm, connection);
command.Parameters.AddWithValue("@algorithm", filtered.KeyExchangeAlgorithm);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredPublicKeyType, connection);
command.Parameters.AddWithValue("@type", filtered.PublicKeyType1);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredPublicKeyType, connection);
command.Parameters.AddWithValue("@type", filtered.PublicKeyType2);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredPublicKeyType, connection);
command.Parameters.AddWithValue("@type", filtered.PublicKeyType3);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredAcceptEncoding, connection);
command.Parameters.AddWithValue("@encoding", filtered.AcceptEncoding1);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredAcceptEncoding, connection);
command.Parameters.AddWithValue("@encoding", filtered.AcceptEncoding2);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredALPN, connection);
command.Parameters.AddWithValue("@alpnValue", filtered.ALPN);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredConnection, connection);
command.Parameters.AddWithValue("@connectionValue", filtered.Connection1);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFilteredConnection, connection);
command.Parameters.AddWithValue("@connectionValue", filtered.Connection2);
_ = command.ExecuteNonQuery();
command = new(InsertIntoFiltered, connection);
command.Parameters.AddWithValue("@ip1", filtered.Ip.Ip1);
command.Parameters.AddWithValue("@ip2", filtered.Ip.Ip2);
command.Parameters.AddWithValue("@ip3", filtered.Ip.Ip3);
command.Parameters.AddWithValue("@ip4", filtered.Ip.Ip4);
command.Parameters.AddWithValue("@port1", filtered.Port1);
command.Parameters.AddWithValue("@port2", filtered.Port2);
command.Parameters.AddWithValue("@url1", filtered.Url1);
command.Parameters.AddWithValue("@url2", filtered.Url2);
command.Parameters.AddWithValue("@serverType1", filtered.ServerType1);
command.Parameters.AddWithValue("@serverType2", filtered.ServerType2);
command.Parameters.AddWithValue("@robotsTXT1", filtered.RobotsTXT1);
command.Parameters.AddWithValue("@robotsTXT2", filtered.RobotsTXT2);
command.Parameters.AddWithValue("@httpVersion1", filtered.HttpVersion1);
command.Parameters.AddWithValue("@httpVersion2", filtered.HttpVersion2);
command.Parameters.AddWithValue("@certificateIssuerCountry", filtered.CertificateIssuerCountry);
command.Parameters.AddWithValue("@certificateOrganizationName", filtered.CertificateOrganizationName);
command.Parameters.AddWithValue("@ipV6", filtered.IpV6);
command.Parameters.AddWithValue("@tlsVersion", filtered.TlsVersion);
command.Parameters.AddWithValue("@cipherSuite", filtered.CipherSuite);
command.Parameters.AddWithValue("@keyExchangeAlgorithm", filtered.KeyExchangeAlgorithm);
command.Parameters.AddWithValue("@publicKeyType1", filtered.PublicKeyType1);
command.Parameters.AddWithValue("@publicKeyType2", filtered.PublicKeyType2);
command.Parameters.AddWithValue("@publicKeyType3", filtered.PublicKeyType3);
command.Parameters.AddWithValue("@acceptEncoding1", filtered.AcceptEncoding1);
command.Parameters.AddWithValue("@acceptEncoding2", filtered.AcceptEncoding2);
command.Parameters.AddWithValue("@aLPN", filtered.ALPN);
command.Parameters.AddWithValue("@connection1", filtered.Connection1);
command.Parameters.AddWithValue("@connection2", filtered.Connection2);
_ = command.ExecuteNonQuery();
command.Dispose();
connection.Close();
}
private void InsertResumeObject(ScannerResumeObject resumeObject)
{
using SqliteConnection connection = new(_resumeConnectionString);
connection.Open();
using SqliteCommand command = new(InsertIntoResume, connection);
command.Parameters.AddWithValue("@threadNumber", resumeObject.ThreadNumber);
command.Parameters.AddWithValue("@startRange", resumeObject.StartRange);
command.Parameters.AddWithValue("@endRange", resumeObject.EndRange);
command.Parameters.AddWithValue("@firstByte", resumeObject.FirstByte);
command.Parameters.AddWithValue("@secondByte", resumeObject.SecondByte);
command.Parameters.AddWithValue("@thirdByte", resumeObject.ThirdByte);
command.Parameters.AddWithValue("@fourthByte", resumeObject.FourthByte);
_ = command.ExecuteNonQuery();
connection.Close();
}
private void InsertCompressedDatabase(int threadNumber, long rows)
{
using SqliteConnection connection = new(_compressedConnectionString);
connection.Open();
using SqliteCommand command = new(InsertIntoCompressedDbConnectionString, connection);
command.Parameters.AddWithValue("@dbNumber", threadNumber);
command.Parameters.AddWithValue("@rows", rows);
_ = command.ExecuteNonQuery();
connection.Close();
}
private void UpdateUnfiltered(Unfiltered unfiltered)
{
using SqliteConnection connection = new(_unfilteredConnectionString);
connection.Open();
using SqliteCommand command = new(UpdateUnfilteredStatement, connection);
command.Parameters.AddWithValue("@id", unfiltered.Id);
_ = command.ExecuteNonQuery();
connection.Close();
}
public Unfiltered ReadUnfilteredWithId(long id)
{
using SqliteConnection connection = new(_unfilteredConnectionString);
connection.Open();
using SqliteCommand command = new(ReadUnfilteredStatement, connection);
command.Parameters.AddWithValue("@id", id);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows) return new();
Unfiltered unfiltered = new();
Ip ip = new();
while (reader.Read())
{
unfiltered.Id = reader.GetInt32(0);
ip.Ip1 = reader.GetInt32(1);
ip.Ip2 = reader.GetInt32(2);
ip.Ip3 = reader.GetInt32(3);
ip.Ip4 = reader.GetInt32(4);
unfiltered.Port1 = reader.GetInt32(5);
unfiltered.Port2 = reader.GetInt32(6);
unfiltered.Filtered = reader.GetBoolean(7);
}
unfiltered.Ip = ip;
return unfiltered;
}
public List<long> GetUnfilteredIndexes()
{
using SqliteConnection connection = new(_unfilteredConnectionString);
connection.Open();
using SqliteCommand command = new(ReadUnfilteredIdsStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows)
{
return [];
}
List<long> ids = [];
while (reader.Read())
{
ids.Add(reader.GetInt64(0));
}
return ids;
}
public long GetFilteredIndexes()
{
long rowId = 0;
using SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
using SqliteCommand command = new(ReadFilteredIdsStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows)
{
return 0;
}
while (reader.Read())
{
rowId = reader.GetInt64(0);
}
return rowId;
}
public long GetDiscardedIndexes()
{
long rowId = 0;
SqliteConnection connection;
SqliteCommand command;
SqliteDataReader reader;
for (int i = 0; i < _discardedConnectionStrings.Count; i++)
{
connection = new(_discardedConnectionStrings[i]);
connection.Open();
command = new(ReadDiscardedSeqIdsStatement, connection);
reader = command.ExecuteReader();
if (!reader.HasRows)
{
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
connection.Close();
}
connection = new(_compressedConnectionString);
connection.Open();
command = new(ReadCompressedDbRowsStatement, connection);
reader = command.ExecuteReader();
if (!reader.HasRows)
{
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
connection.Close();
connection.Dispose();
command.Dispose();
reader.Dispose();
return rowId;
}
private static long GetDiscardedIndexesForSpecificDb(string connectionString)
{
using SqliteConnection connection = new(connectionString);
connection.Open();
using SqliteCommand command = new(ReadDiscardedSeqIdsStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
long rowId = 0;
if (!reader.HasRows)
{
return rowId;
}
while (reader.Read())
{
rowId += reader.GetInt64(0);
}
return rowId;
}
public bool FilteredIpExists(Ip ip)
{
using SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
using SqliteCommand command = new(ReadFilteredIpStatement, connection);
command.Parameters.AddWithValue("@ip1", ip.Ip1);
command.Parameters.AddWithValue("@ip2", ip.Ip2);
command.Parameters.AddWithValue("@ip3", ip.Ip3);
command.Parameters.AddWithValue("@ip4", ip.Ip4);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows)
{
return false;
}
Ip tempIp = new();
while (reader.Read())
{
tempIp.Ip1 = reader.GetInt32(0);
tempIp.Ip2 = reader.GetInt32(1);
tempIp.Ip3 = reader.GetInt32(2);
tempIp.Ip4 = reader.GetInt32(3);
}
if (tempIp.Ip1 != ip.Ip1)
{
return false;
}
if (tempIp.Ip2 != ip.Ip2)
{
return false;
}
if (tempIp.Ip3 != ip.Ip3)
{
return false;
}
if (tempIp.Ip4 != ip.Ip4)
{
return false;
}
return true;
}
public List<SearchResult?> GetSearchResults()
{
lock (_readFilteredLock)
{
using SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
using SqliteCommand command = new(ReadFilteredStatement, connection);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows)
{
return [];
}
List<SearchResult?> results = [];
while (reader.Read())
{
SearchResult result = new();
result.Title = reader.GetString(0);
result.Url = reader.GetString(1);
results.Add(result);
}
return results;
}
}
public ScannerResumeObject? GetResumeObject(int threadNumber)
{
lock (_readAndDeleteResumeLock)
{
using SqliteConnection connection = new(_resumeConnectionString);
connection.Open();
using SqliteCommand command = new(ReadAndDeleteResumeStatement, connection);
command.Parameters.AddWithValue("@threadNumber", threadNumber);
using SqliteDataReader reader = command.ExecuteReader();
if (!reader.HasRows)
{
return null;
}
ScannerResumeObject resumeObject = new();
while (reader.Read())
{
resumeObject.ThreadNumber = reader.GetInt32(0);
resumeObject.StartRange = reader.GetInt32(1);
resumeObject.EndRange = reader.GetInt32(2);
resumeObject.FirstByte = reader.GetInt32(3);
resumeObject.SecondByte = reader.GetInt32(4);
resumeObject.ThirdByte = reader.GetInt32(5);
resumeObject.FourthByte = reader.GetInt32(6);
}
return resumeObject;
}
}
public void ReIndex()
{
_pause = true;
Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open.
if (!_paused)
{
Thread.Sleep(5000); // Just for safety.
}
SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
SqliteCommand command = new(ReIndexDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_unfilteredConnectionString);
connection.Open();
command = new(ReIndexDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection.Dispose();
command.Dispose();
_pause = false;
_paused = false;
}
public void Vacuum()
{
_pause = true;
Thread.Sleep(5000); // Wait for 5 secs before doing anything with the db So we're sure that no db is open.
if (!_paused)
{
Thread.Sleep(5000); // Just for safety.
}
SqliteConnection connection = new(_filteredConnectionString);
connection.Open();
SqliteCommand command = new(VacuumDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection = new(_unfilteredConnectionString);
connection.Open();
command = new(VacuumDatabasesStatement, connection);
_ = command.ExecuteNonQuery();
connection.Close();
connection.Dispose();
command.Dispose();
_pause = false;
_paused = false;
}
private (string, string) CreateDiscardedDb(int threadNumber)
{
string absolutePath = $"{_basePath}/Models/Discarded{threadNumber}.db";
string databaseName = $"Data Source={_basePath}/Models/Discarded{threadNumber}.db";
const string createStatement = "CREATE TABLE IF NOT EXISTS Discarded (Id INTEGER NOT NULL, Ip1 INTEGER NOT NULL, Ip2 INTEGER NOT NULL, Ip3 INTEGER NOT NULL, Ip4 INTEGER NOT NULL, ResponseCode INTEGER NOT NULL, PRIMARY KEY(Id AUTOINCREMENT))";
_discardedConnectionStrings.Add(databaseName);
using SqliteConnection connection = new(databaseName);
connection.Open();
using SqliteCommand command = new(createStatement, connection);
command.ExecuteNonQuery();
return (absolutePath, databaseName);
}
public void Stop()
{
_stop = true;
}
}