Acquire - File Acquiration
- The core element for the schedual system is the concept of a named
+ The core element for the schedule system is the concept of a named
queue. Each queue is unique and each queue has a name derived from the
- URI. The degree of paralization can be controled by how the queue
+ URI. The degree of paralization can be controlled by how the queue
name is derived from the URI.
##################################################################### */
#include <apt-pkg/strutl.h>
#include <apt-pkg/fileutl.h>
+#include <string>
+#include <vector>
#include <iostream>
#include <sstream>
#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
#include <dirent.h>
#include <sys/time.h>
+#include <sys/select.h>
#include <errno.h>
#include <apti18n.h>
// ---------------------------------------------------------------------
/* A worker has died. This can not be done while the select loop is running
as it would require that RunFds could handling a changing list state and
- it cant.. */
+ it can't.. */
void pkgAcquire::Remove(Worker *Work)
{
if (Running == true)
{
Queue *I = Queues;
bool Res = false;
- for (; I != 0; I = I->Next)
- Res |= I->Dequeue(Itm);
-
if (Debug == true)
clog << "Dequeuing " << Itm->DestFile << endl;
+
+ for (; I != 0; I = I->Next)
+ {
+ if (I->Dequeue(Itm))
+ {
+ Res = true;
+ if (Debug == true)
+ clog << "Dequeued from " << I->Name << endl;
+ }
+ }
+
if (Res == true)
ToFetch--;
}
/* Single-Instance methods get exactly one queue per URI. This is
also used for the Access queue method */
if (Config->SingleInstance == true || QueueMode == QueueAccess)
- return U.Access;
+ return U.Access;
+
+ string AccessSchema = U.Access + ':',
+ FullQueueName = AccessSchema + U.Host;
+ unsigned int Instances = 0, SchemaLength = AccessSchema.length();
- return U.Access + ':' + U.Host;
+ Queue *I = Queues;
+ for (; I != 0; I = I->Next) {
+ // if the queue already exists, re-use it
+ if (I->Name == FullQueueName)
+ return FullQueueName;
+
+ if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
+ Instances++;
+ }
+
+ if (Debug) {
+ clog << "Found " << Instances << " instances of " << U.Access << endl;
+ }
+
+ if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
+ return U.Access;
+
+ return FullQueueName;
}
/*}}}*/
// Acquire::GetConfig - Fetch the configuration information /*{{{*/
pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
{
return I->NextAcquire;
-};
+}
/*}}}*/
// Acquire::Clean - Cleans a directory /*{{{*/
// ---------------------------------------------------------------------
// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
// ---------------------------------------------------------------------
/* This is the total number of bytes needed */
-unsigned long long pkgAcquire::TotalNeeded()
+APT_PURE unsigned long long pkgAcquire::TotalNeeded()
{
unsigned long long Total = 0;
for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
// ---------------------------------------------------------------------
/* This is the number of bytes that is not local */
-unsigned long long pkgAcquire::FetchNeeded()
+APT_PURE unsigned long long pkgAcquire::FetchNeeded()
{
unsigned long long Total = 0;
for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
// ---------------------------------------------------------------------
/* This is the number of bytes that is not local */
-unsigned long long pkgAcquire::PartialPresent()
+APT_PURE unsigned long long pkgAcquire::PartialPresent()
{
unsigned long long Total = 0;
for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
<< ":" << (CurrentBytes/float(TotalBytes)*100.0)
<< ":" << msg
<< endl;
- write(fd, status.str().c_str(), status.str().size());
+
+ std::string const dlstatus = status.str();
+ FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
}
return true;