// -*- mode: cpp; mode: fold -*-
// Description /*{{{*/
-// $Id: acquire.cc,v 1.17 1998/11/22 23:37:03 jgg Exp $
+// $Id: acquire.cc,v 1.23 1998/12/11 07:20:32 jgg Exp $
/* ######################################################################
Acquire - File Acquiration
pkgAcquire::MethodConfig::MethodConfig()
{
SingleInstance = false;
- PreScan = false;
Pipeline = false;
SendConfig = false;
LocalOnly = false;
Items = 0;
Next = 0;
Workers = 0;
+ MaxPipeDepth = 1;
+ PipeDepth = 0;
}
/*}}}*/
// Queue::~Queue - Destructor /*{{{*/
/*}}}*/
// Queue::Dequeue - Remove an item from the queue /*{{{*/
// ---------------------------------------------------------------------
-/* We return true if we hit something*/
+/* We return true if we hit something */
bool pkgAcquire::Queue::Dequeue(Item *Owner)
{
+ if (Owner->Status == pkgAcquire::Item::StatFetching)
+ return _error->Error("Tried to dequeue a fetching object");
+
bool Res = false;
QItem **I = &Items;
if (Workers->Start() == false)
return false;
+ /* When pipelining we commit 10 items. This needs to change when we
+ added other source retry to have cycle maintain a pipeline depth
+ on its own. */
+ if (Cnf->Pipeline == true)
+ MaxPipeDepth = 10;
+ else
+ MaxPipeDepth = 1;
+
return Cycle();
}
/*}}}*/
main queue too.*/
bool pkgAcquire::Queue::ItemDone(QItem *Itm)
{
+ PipeDepth--;
+ if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
+ Itm->Owner->Status = pkgAcquire::Item::StatDone;
+
if (Itm->Owner->QueueCounter <= 1)
Owner->Dequeue(Itm->Owner);
else
/*}}}*/
// Queue::Cycle - Queue new items into the method /*{{{*/
// ---------------------------------------------------------------------
-/* This locates a new idle item and sends it to the worker */
+/* This locates a new idle item and sends it to the worker. If pipelining
+ is enabled then it keeps the pipe full. */
bool pkgAcquire::Queue::Cycle()
{
if (Items == 0 || Workers == 0)
return true;
+ if (PipeDepth < 0)
+ return _error->Error("Pipedepth failure");
+
// Look for a queable item
QItem *I = Items;
- for (; I != 0; I = I->Next)
- if (I->Owner->Status == pkgAcquire::Item::StatIdle)
- break;
-
- // Nothing to do, queue is idle.
- if (I == 0)
- return true;
+ while (PipeDepth < (signed)MaxPipeDepth)
+ {
+ for (; I != 0; I = I->Next)
+ if (I->Owner->Status == pkgAcquire::Item::StatIdle)
+ break;
+
+ // Nothing to do, queue is idle.
+ if (I == 0)
+ return true;
+
+ I->Worker = Workers;
+ I->Owner->Status = pkgAcquire::Item::StatFetching;
+ PipeDepth++;
+ if (Workers->QueueItem(I) == false)
+ return false;
+ }
- I->Worker = Workers;
- I->Owner->Status = pkgAcquire::Item::StatFetching;
- return Workers->QueueItem(I);
+ return true;
}
/*}}}*/
// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
// ---------------------------------------------------------------------
-/* */
+/* This is called when an item in multiple queues is dequeued */
void pkgAcquire::Queue::Bump()
{
+ Cycle();
}
/*}}}*/
{
TotalBytes = 0;
CurrentBytes = 0;
+ TotalItems = 0;
+ CurrentItems = 0;
// Compute the total number of bytes to fetch
unsigned int Unknown = 0;
for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
I++, Count++)
{
+ TotalItems++;
+ if ((*I)->Status == pkgAcquire::Item::StatDone)
+ CurrentItems++;
+
// Totally ignore local items
if ((*I)->Local == true)
continue;
TotalBytes = 0;
FetchedBytes = 0;
ElapsedTime = 0;
+ TotalItems = 0;
+ CurrentItems = 0;
}
/*}}}*/
// AcquireStatus::Stop - Finished downloading /*{{{*/