]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
* apt-pkg/contrib/configuration.cc:
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
1cd1c398 22#include <apt-pkg/fileutl.h>
8267fe24 23
b2e465d6 24#include <apti18n.h>
b4fc9b6f
AL
25
26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
0118833a
AL
33 /*}}}*/
34
b4fc9b6f
AL
35using namespace std;
36
0118833a
AL
37// Acquire::pkgAcquire - Constructor /*{{{*/
38// ---------------------------------------------------------------------
93bf083d 39/* We grab some runtime state from the configuration space */
5efbd596 40pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 41 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 42 Running(false)
0118833a 43{
1cd1c398 44 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
45 if (strcasecmp(Mode.c_str(),"host") == 0)
46 QueueMode = QueueHost;
47 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
48 QueueMode = QueueAccess;
49}
5efbd596 50pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
51 Configs(0), Log(Progress), ToFetch(0),
52 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 53 Running(false)
1cd1c398
DK
54{
55 string const Mode = _config->Find("Acquire::Queue-Mode","host");
56 if (strcasecmp(Mode.c_str(),"host") == 0)
57 QueueMode = QueueHost;
58 if (strcasecmp(Mode.c_str(),"access") == 0)
59 QueueMode = QueueAccess;
60 Setup(Progress, "");
61}
62 /*}}}*/
63// Acquire::Setup - Delayed Constructor /*{{{*/
64// ---------------------------------------------------------------------
65/* Do everything needed to be a complete Acquire object and report the
66 success (or failure) back so the user knows that something is wrong… */
67bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
68{
69 Log = Progress;
0a8a80e5 70
1cd1c398 71 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
72 string const listDir = _config->FindDir("Dir::State::lists");
73 string const partialListDir = listDir + "partial/";
74 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
75 string const partialArchivesDir = archivesDir + "partial/";
76
7753e468
DK
77 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
78 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
79 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
80
7753e468
DK
81 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
82 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 83 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
84
85 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
86 return true;
87
88 // Lock the directory this acquire object will work in
89 LockFD = GetLock(flCombine(Lock, "lock"));
90 if (LockFD == -1)
91 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
92
93 return true;
94}
95 /*}}}*/
0118833a
AL
96// Acquire::~pkgAcquire - Destructor /*{{{*/
97// ---------------------------------------------------------------------
93bf083d 98/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
99pkgAcquire::~pkgAcquire()
100{
459681d3 101 Shutdown();
1cd1c398
DK
102
103 if (LockFD != -1)
104 close(LockFD);
105
3b5421b4
AL
106 while (Configs != 0)
107 {
108 MethodConfig *Jnk = Configs;
109 Configs = Configs->Next;
110 delete Jnk;
111 }
281daf46
AL
112}
113 /*}}}*/
8e5fc8f5 114// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
115// ---------------------------------------------------------------------
116/* */
117void pkgAcquire::Shutdown()
118{
119 while (Items.size() != 0)
1b480911
AL
120 {
121 if (Items[0]->Status == Item::StatFetching)
122 Items[0]->Status = Item::StatError;
281daf46 123 delete Items[0];
1b480911 124 }
0a8a80e5
AL
125
126 while (Queues != 0)
127 {
128 Queue *Jnk = Queues;
129 Queues = Queues->Next;
130 delete Jnk;
131 }
0118833a
AL
132}
133 /*}}}*/
134// Acquire::Add - Add a new item /*{{{*/
135// ---------------------------------------------------------------------
93bf083d
AL
136/* This puts an item on the acquire list. This list is mainly for tracking
137 item status */
0118833a
AL
138void pkgAcquire::Add(Item *Itm)
139{
140 Items.push_back(Itm);
141}
142 /*}}}*/
143// Acquire::Remove - Remove a item /*{{{*/
144// ---------------------------------------------------------------------
93bf083d 145/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
146void pkgAcquire::Remove(Item *Itm)
147{
a3eaf954
AL
148 Dequeue(Itm);
149
753b3525 150 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
151 {
152 if (*I == Itm)
b4fc9b6f 153 {
0118833a 154 Items.erase(I);
b4fc9b6f
AL
155 I = Items.begin();
156 }
753b3525
AL
157 else
158 I++;
8267fe24 159 }
0118833a
AL
160}
161 /*}}}*/
0a8a80e5
AL
162// Acquire::Add - Add a worker /*{{{*/
163// ---------------------------------------------------------------------
93bf083d
AL
164/* A list of workers is kept so that the select loop can direct their FD
165 usage. */
0a8a80e5
AL
166void pkgAcquire::Add(Worker *Work)
167{
168 Work->NextAcquire = Workers;
169 Workers = Work;
170}
171 /*}}}*/
172// Acquire::Remove - Remove a worker /*{{{*/
173// ---------------------------------------------------------------------
93bf083d
AL
174/* A worker has died. This can not be done while the select loop is running
175 as it would require that RunFds could handling a changing list state and
176 it cant.. */
0a8a80e5
AL
177void pkgAcquire::Remove(Worker *Work)
178{
93bf083d
AL
179 if (Running == true)
180 abort();
181
0a8a80e5
AL
182 Worker **I = &Workers;
183 for (; *I != 0;)
184 {
185 if (*I == Work)
186 *I = (*I)->NextAcquire;
187 else
188 I = &(*I)->NextAcquire;
189 }
190}
191 /*}}}*/
0118833a
AL
192// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
193// ---------------------------------------------------------------------
93bf083d 194/* This is the entry point for an item. An item calls this function when
281daf46 195 it is constructed which creates a queue (based on the current queue
93bf083d
AL
196 mode) and puts the item in that queue. If the system is running then
197 the queue might be started. */
8267fe24 198void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 199{
0a8a80e5 200 // Determine which queue to put the item in
e331f6ed
AL
201 const MethodConfig *Config;
202 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
203 if (Name.empty() == true)
204 return;
205
206 // Find the queue structure
207 Queue *I = Queues;
208 for (; I != 0 && I->Name != Name; I = I->Next);
209 if (I == 0)
210 {
211 I = new Queue(Name,this);
212 I->Next = Queues;
213 Queues = I;
93bf083d
AL
214
215 if (Running == true)
216 I->Startup();
0a8a80e5 217 }
bfd22fc0 218
e331f6ed
AL
219 // See if this is a local only URI
220 if (Config->LocalOnly == true && Item.Owner->Complete == false)
221 Item.Owner->Local = true;
8267fe24 222 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
223
224 // Queue it into the named queue
c03462c6
MV
225 if(I->Enqueue(Item))
226 ToFetch++;
227
0a8a80e5
AL
228 // Some trace stuff
229 if (Debug == true)
230 {
8267fe24
AL
231 clog << "Fetching " << Item.URI << endl;
232 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 233 clog << " Queue is: " << Name << endl;
0a8a80e5 234 }
3b5421b4
AL
235}
236 /*}}}*/
0a8a80e5 237// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 238// ---------------------------------------------------------------------
93bf083d
AL
239/* This is called when an item is finished being fetched. It removes it
240 from all the queues */
0a8a80e5
AL
241void pkgAcquire::Dequeue(Item *Itm)
242{
243 Queue *I = Queues;
bfd22fc0 244 bool Res = false;
0a8a80e5 245 for (; I != 0; I = I->Next)
bfd22fc0 246 Res |= I->Dequeue(Itm);
93bf083d
AL
247
248 if (Debug == true)
249 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
250 if (Res == true)
251 ToFetch--;
0a8a80e5
AL
252}
253 /*}}}*/
254// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
255// ---------------------------------------------------------------------
256/* The string returned depends on the configuration settings and the
257 method parameters. Given something like http://foo.org/bar it can
258 return http://foo.org or http */
e331f6ed 259string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 260{
93bf083d
AL
261 URI U(Uri);
262
e331f6ed 263 Config = GetConfig(U.Access);
0a8a80e5
AL
264 if (Config == 0)
265 return string();
266
267 /* Single-Instance methods get exactly one queue per URI. This is
268 also used for the Access queue method */
269 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 270 return U.Access;
93bf083d
AL
271
272 return U.Access + ':' + U.Host;
0118833a
AL
273}
274 /*}}}*/
3b5421b4
AL
275// Acquire::GetConfig - Fetch the configuration information /*{{{*/
276// ---------------------------------------------------------------------
277/* This locates the configuration structure for an access method. If
278 a config structure cannot be found a Worker will be created to
279 retrieve it */
0a8a80e5 280pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
281{
282 // Search for an existing config
283 MethodConfig *Conf;
284 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
285 if (Conf->Access == Access)
286 return Conf;
287
288 // Create the new config class
289 Conf = new MethodConfig;
290 Conf->Access = Access;
291 Conf->Next = Configs;
292 Configs = Conf;
0118833a 293
3b5421b4
AL
294 // Create the worker to fetch the configuration
295 Worker Work(Conf);
296 if (Work.Start() == false)
297 return 0;
7c6e2dc7
MV
298
299 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 300 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
301 Conf->SingleInstance = true;
302
3b5421b4
AL
303 return Conf;
304}
305 /*}}}*/
0a8a80e5
AL
306// Acquire::SetFds - Deal with readable FDs /*{{{*/
307// ---------------------------------------------------------------------
308/* Collect FDs that have activity monitors into the fd sets */
309void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
310{
311 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
312 {
313 if (I->InReady == true && I->InFd >= 0)
314 {
315 if (Fd < I->InFd)
316 Fd = I->InFd;
317 FD_SET(I->InFd,RSet);
318 }
319 if (I->OutReady == true && I->OutFd >= 0)
320 {
321 if (Fd < I->OutFd)
322 Fd = I->OutFd;
323 FD_SET(I->OutFd,WSet);
324 }
325 }
326}
327 /*}}}*/
328// Acquire::RunFds - Deal with active FDs /*{{{*/
329// ---------------------------------------------------------------------
93bf083d
AL
330/* Dispatch active FDs over to the proper workers. It is very important
331 that a worker never be erased while this is running! The queue class
332 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
333void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
334{
335 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
336 {
337 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
338 I->InFdReady();
339 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
340 I->OutFdReady();
341 }
342}
343 /*}}}*/
344// Acquire::Run - Run the fetch sequence /*{{{*/
345// ---------------------------------------------------------------------
346/* This runs the queues. It manages a select loop for all of the
347 Worker tasks. The workers interact with the queues and items to
348 manage the actual fetch. */
1c5f7e5f 349pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 350{
8b89e57f
AL
351 Running = true;
352
0a8a80e5
AL
353 for (Queue *I = Queues; I != 0; I = I->Next)
354 I->Startup();
355
b98f2859
AL
356 if (Log != 0)
357 Log->Start();
358
024d1123
AL
359 bool WasCancelled = false;
360
0a8a80e5 361 // Run till all things have been acquired
8267fe24
AL
362 struct timeval tv;
363 tv.tv_sec = 0;
1c5f7e5f 364 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
365 while (ToFetch > 0)
366 {
367 fd_set RFds;
368 fd_set WFds;
369 int Highest = 0;
370 FD_ZERO(&RFds);
371 FD_ZERO(&WFds);
372 SetFds(Highest,&RFds,&WFds);
373
b0db36b1
AL
374 int Res;
375 do
376 {
377 Res = select(Highest+1,&RFds,&WFds,0,&tv);
378 }
379 while (Res < 0 && errno == EINTR);
380
8267fe24 381 if (Res < 0)
8b89e57f 382 {
8267fe24
AL
383 _error->Errno("select","Select has failed");
384 break;
8b89e57f 385 }
93bf083d 386
0a8a80e5 387 RunFds(&RFds,&WFds);
93bf083d
AL
388 if (_error->PendingError() == true)
389 break;
8267fe24
AL
390
391 // Timeout, notify the log class
392 if (Res == 0 || (Log != 0 && Log->Update == true))
393 {
1c5f7e5f 394 tv.tv_usec = PulseIntervall;
8267fe24
AL
395 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
396 I->Pulse();
024d1123
AL
397 if (Log != 0 && Log->Pulse(this) == false)
398 {
399 WasCancelled = true;
400 break;
401 }
8267fe24 402 }
0a8a80e5 403 }
be4401bf 404
b98f2859
AL
405 if (Log != 0)
406 Log->Stop();
407
be4401bf
AL
408 // Shut down the acquire bits
409 Running = false;
0a8a80e5 410 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 411 I->Shutdown(false);
0a8a80e5 412
ab559b35 413 // Shut down the items
b4fc9b6f 414 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 415 (*I)->Finished();
ab559b35 416
024d1123
AL
417 if (_error->PendingError())
418 return Failed;
419 if (WasCancelled)
420 return Cancelled;
421 return Continue;
93bf083d
AL
422}
423 /*}}}*/
be4401bf 424// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
425// ---------------------------------------------------------------------
426/* This routine bumps idle queues in hopes that they will be able to fetch
427 the dequeued item */
428void pkgAcquire::Bump()
429{
be4401bf
AL
430 for (Queue *I = Queues; I != 0; I = I->Next)
431 I->Bump();
0a8a80e5
AL
432}
433 /*}}}*/
8267fe24
AL
434// Acquire::WorkerStep - Step to the next worker /*{{{*/
435// ---------------------------------------------------------------------
436/* Not inlined to advoid including acquire-worker.h */
437pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
438{
439 return I->NextAcquire;
440};
441 /*}}}*/
a6568219 442// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
443// ---------------------------------------------------------------------
444/* This is a bit simplistic, it looks at every file in the dir and sees
445 if it is part of the download set. */
446bool pkgAcquire::Clean(string Dir)
447{
448 DIR *D = opendir(Dir.c_str());
449 if (D == 0)
b2e465d6 450 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
451
452 string StartDir = SafeGetCWD();
453 if (chdir(Dir.c_str()) != 0)
454 {
455 closedir(D);
b2e465d6 456 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
457 }
458
459 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
460 {
461 // Skip some files..
462 if (strcmp(Dir->d_name,"lock") == 0 ||
463 strcmp(Dir->d_name,"partial") == 0 ||
464 strcmp(Dir->d_name,".") == 0 ||
465 strcmp(Dir->d_name,"..") == 0)
466 continue;
467
468 // Look in the get list
b4fc9b6f 469 ItemCIterator I = Items.begin();
7a7fa5f0
AL
470 for (; I != Items.end(); I++)
471 if (flNotDir((*I)->DestFile) == Dir->d_name)
472 break;
473
474 // Nothing found, nuke it
475 if (I == Items.end())
476 unlink(Dir->d_name);
477 };
478
7a7fa5f0 479 closedir(D);
3c8cda8b
MV
480 if (chdir(StartDir.c_str()) != 0)
481 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
482 return true;
483}
484 /*}}}*/
a6568219
AL
485// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
486// ---------------------------------------------------------------------
487/* This is the total number of bytes needed */
a3c4c81a 488unsigned long long pkgAcquire::TotalNeeded()
a6568219 489{
a3c4c81a 490 unsigned long long Total = 0;
b4fc9b6f 491 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
492 Total += (*I)->FileSize;
493 return Total;
494}
495 /*}}}*/
496// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
497// ---------------------------------------------------------------------
498/* This is the number of bytes that is not local */
a3c4c81a 499unsigned long long pkgAcquire::FetchNeeded()
a6568219 500{
a3c4c81a 501 unsigned long long Total = 0;
b4fc9b6f 502 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
503 if ((*I)->Local == false)
504 Total += (*I)->FileSize;
505 return Total;
506}
507 /*}}}*/
6b1ff003
AL
508// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
509// ---------------------------------------------------------------------
510/* This is the number of bytes that is not local */
a3c4c81a 511unsigned long long pkgAcquire::PartialPresent()
6b1ff003 512{
a3c4c81a 513 unsigned long long Total = 0;
b4fc9b6f 514 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
515 if ((*I)->Local == false)
516 Total += (*I)->PartialSize;
517 return Total;
518}
92fcbfc1 519 /*}}}*/
8e5fc8f5 520// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
521// ---------------------------------------------------------------------
522/* */
523pkgAcquire::UriIterator pkgAcquire::UriBegin()
524{
525 return UriIterator(Queues);
526}
527 /*}}}*/
8e5fc8f5 528// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
529// ---------------------------------------------------------------------
530/* */
531pkgAcquire::UriIterator pkgAcquire::UriEnd()
532{
533 return UriIterator(0);
534}
535 /*}}}*/
e331f6ed
AL
536// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
537// ---------------------------------------------------------------------
538/* */
539pkgAcquire::MethodConfig::MethodConfig()
540{
541 SingleInstance = false;
e331f6ed
AL
542 Pipeline = false;
543 SendConfig = false;
544 LocalOnly = false;
459681d3 545 Removable = false;
e331f6ed
AL
546 Next = 0;
547}
548 /*}}}*/
0a8a80e5
AL
549// Queue::Queue - Constructor /*{{{*/
550// ---------------------------------------------------------------------
551/* */
552pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
553 Owner(Owner)
554{
555 Items = 0;
556 Next = 0;
557 Workers = 0;
b185acc2
AL
558 MaxPipeDepth = 1;
559 PipeDepth = 0;
0a8a80e5
AL
560}
561 /*}}}*/
562// Queue::~Queue - Destructor /*{{{*/
563// ---------------------------------------------------------------------
564/* */
565pkgAcquire::Queue::~Queue()
566{
8e5fc8f5 567 Shutdown(true);
0a8a80e5
AL
568
569 while (Items != 0)
570 {
571 QItem *Jnk = Items;
572 Items = Items->Next;
573 delete Jnk;
574 }
575}
576 /*}}}*/
577// Queue::Enqueue - Queue an item to the queue /*{{{*/
578// ---------------------------------------------------------------------
579/* */
c03462c6 580bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 581{
7a1b1f8b 582 QItem **I = &Items;
c03462c6
MV
583 // move to the end of the queue and check for duplicates here
584 for (; *I != 0; I = &(*I)->Next)
585 if (Item.URI == (*I)->URI)
586 {
587 Item.Owner->Status = Item::StatDone;
588 return false;
589 }
590
0a8a80e5 591 // Create a new item
7a1b1f8b
AL
592 QItem *Itm = new QItem;
593 *Itm = Item;
594 Itm->Next = 0;
595 *I = Itm;
0a8a80e5 596
8267fe24 597 Item.Owner->QueueCounter++;
93bf083d
AL
598 if (Items->Next == 0)
599 Cycle();
c03462c6 600 return true;
0a8a80e5
AL
601}
602 /*}}}*/
c88edf1d 603// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 604// ---------------------------------------------------------------------
b185acc2 605/* We return true if we hit something */
bfd22fc0 606bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 607{
b185acc2
AL
608 if (Owner->Status == pkgAcquire::Item::StatFetching)
609 return _error->Error("Tried to dequeue a fetching object");
610
bfd22fc0
AL
611 bool Res = false;
612
0a8a80e5
AL
613 QItem **I = &Items;
614 for (; *I != 0;)
615 {
616 if ((*I)->Owner == Owner)
617 {
618 QItem *Jnk= *I;
619 *I = (*I)->Next;
620 Owner->QueueCounter--;
621 delete Jnk;
bfd22fc0 622 Res = true;
0a8a80e5
AL
623 }
624 else
625 I = &(*I)->Next;
626 }
bfd22fc0
AL
627
628 return Res;
0a8a80e5
AL
629}
630 /*}}}*/
631// Queue::Startup - Start the worker processes /*{{{*/
632// ---------------------------------------------------------------------
8e5fc8f5
AL
633/* It is possible for this to be called with a pre-existing set of
634 workers. */
0a8a80e5
AL
635bool pkgAcquire::Queue::Startup()
636{
8e5fc8f5
AL
637 if (Workers == 0)
638 {
639 URI U(Name);
640 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
641 if (Cnf == 0)
642 return false;
643
644 Workers = new Worker(this,Cnf,Owner->Log);
645 Owner->Add(Workers);
646 if (Workers->Start() == false)
647 return false;
648
649 /* When pipelining we commit 10 items. This needs to change when we
650 added other source retry to have cycle maintain a pipeline depth
651 on its own. */
652 if (Cnf->Pipeline == true)
6ce72612 653 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
654 else
655 MaxPipeDepth = 1;
656 }
5cb5d8dc 657
93bf083d 658 return Cycle();
0a8a80e5
AL
659}
660 /*}}}*/
661// Queue::Shutdown - Shutdown the worker processes /*{{{*/
662// ---------------------------------------------------------------------
8e5fc8f5
AL
663/* If final is true then all workers are eliminated, otherwise only workers
664 that do not need cleanup are removed */
665bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
666{
667 // Delete all of the workers
8e5fc8f5
AL
668 pkgAcquire::Worker **Cur = &Workers;
669 while (*Cur != 0)
0a8a80e5 670 {
8e5fc8f5
AL
671 pkgAcquire::Worker *Jnk = *Cur;
672 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
673 {
674 *Cur = Jnk->NextQueue;
675 Owner->Remove(Jnk);
676 delete Jnk;
677 }
678 else
679 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
680 }
681
682 return true;
3b5421b4
AL
683}
684 /*}}}*/
7d8afa39 685// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
686// ---------------------------------------------------------------------
687/* */
688pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
689{
690 for (QItem *I = Items; I != 0; I = I->Next)
691 if (I->URI == URI && I->Worker == Owner)
692 return I;
693 return 0;
694}
695 /*}}}*/
696// Queue::ItemDone - Item has been completed /*{{{*/
697// ---------------------------------------------------------------------
698/* The worker signals this which causes the item to be removed from the
93bf083d
AL
699 queue. If this is the last queue instance then it is removed from the
700 main queue too.*/
c88edf1d
AL
701bool pkgAcquire::Queue::ItemDone(QItem *Itm)
702{
b185acc2 703 PipeDepth--;
db890fdb
AL
704 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
705 Itm->Owner->Status = pkgAcquire::Item::StatDone;
706
93bf083d
AL
707 if (Itm->Owner->QueueCounter <= 1)
708 Owner->Dequeue(Itm->Owner);
709 else
710 {
711 Dequeue(Itm->Owner);
712 Owner->Bump();
713 }
c88edf1d 714
93bf083d
AL
715 return Cycle();
716}
717 /*}}}*/
718// Queue::Cycle - Queue new items into the method /*{{{*/
719// ---------------------------------------------------------------------
b185acc2
AL
720/* This locates a new idle item and sends it to the worker. If pipelining
721 is enabled then it keeps the pipe full. */
93bf083d
AL
722bool pkgAcquire::Queue::Cycle()
723{
724 if (Items == 0 || Workers == 0)
c88edf1d
AL
725 return true;
726
e7432370
AL
727 if (PipeDepth < 0)
728 return _error->Error("Pipedepth failure");
729
93bf083d
AL
730 // Look for a queable item
731 QItem *I = Items;
e7432370 732 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
733 {
734 for (; I != 0; I = I->Next)
735 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
736 break;
737
738 // Nothing to do, queue is idle.
739 if (I == 0)
740 return true;
741
742 I->Worker = Workers;
743 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 744 PipeDepth++;
b185acc2
AL
745 if (Workers->QueueItem(I) == false)
746 return false;
747 }
93bf083d 748
b185acc2 749 return true;
c88edf1d
AL
750}
751 /*}}}*/
be4401bf
AL
752// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
753// ---------------------------------------------------------------------
b185acc2 754/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
755void pkgAcquire::Queue::Bump()
756{
b185acc2 757 Cycle();
be4401bf
AL
758}
759 /*}}}*/
b98f2859
AL
760// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
761// ---------------------------------------------------------------------
762/* */
c5ccf175 763pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
764{
765 Start();
766}
767 /*}}}*/
768// AcquireStatus::Pulse - Called periodically /*{{{*/
769// ---------------------------------------------------------------------
770/* This computes some internal state variables for the derived classes to
771 use. It generates the current downloaded bytes and total bytes to download
772 as well as the current CPS estimate. */
024d1123 773bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
774{
775 TotalBytes = 0;
776 CurrentBytes = 0;
d568ed2d
AL
777 TotalItems = 0;
778 CurrentItems = 0;
b98f2859
AL
779
780 // Compute the total number of bytes to fetch
781 unsigned int Unknown = 0;
782 unsigned int Count = 0;
b4fc9b6f 783 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
784 I++, Count++)
785 {
d568ed2d
AL
786 TotalItems++;
787 if ((*I)->Status == pkgAcquire::Item::StatDone)
788 CurrentItems++;
789
a6568219
AL
790 // Totally ignore local items
791 if ((*I)->Local == true)
792 continue;
b2e465d6 793
b98f2859
AL
794 TotalBytes += (*I)->FileSize;
795 if ((*I)->Complete == true)
796 CurrentBytes += (*I)->FileSize;
797 if ((*I)->FileSize == 0 && (*I)->Complete == false)
798 Unknown++;
799 }
800
801 // Compute the current completion
dbbc5494 802 unsigned long long ResumeSize = 0;
b98f2859
AL
803 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
804 I = Owner->WorkerStep(I))
805 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
806 {
807 CurrentBytes += I->CurrentSize;
808 ResumeSize += I->ResumePoint;
809
810 // Files with unknown size always have 100% completion
811 if (I->CurrentItem->Owner->FileSize == 0 &&
812 I->CurrentItem->Owner->Complete == false)
813 TotalBytes += I->CurrentSize;
814 }
815
b98f2859
AL
816 // Normalize the figures and account for unknown size downloads
817 if (TotalBytes <= 0)
818 TotalBytes = 1;
819 if (Unknown == Count)
820 TotalBytes = Unknown;
18ef0a78
AL
821
822 // Wha?! Is not supposed to happen.
823 if (CurrentBytes > TotalBytes)
824 CurrentBytes = TotalBytes;
b98f2859
AL
825
826 // Compute the CPS
827 struct timeval NewTime;
828 gettimeofday(&NewTime,0);
2ec1674d 829 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
830 NewTime.tv_sec - Time.tv_sec > 6)
831 {
f17ac097
AL
832 double Delta = NewTime.tv_sec - Time.tv_sec +
833 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 834
b98f2859 835 // Compute the CPS value
f17ac097 836 if (Delta < 0.01)
e331f6ed
AL
837 CurrentCPS = 0;
838 else
aa0e1101
AL
839 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
840 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 841 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
842 Time = NewTime;
843 }
024d1123 844
75ef8f14
MV
845 int fd = _config->FindI("APT::Status-Fd",-1);
846 if(fd > 0)
847 {
848 ostringstream status;
849
850 char msg[200];
851 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
852 unsigned long long ETA = 0;
853 if(CurrentCPS > 0)
854 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 855
1e8b4c0f
MV
856 // only show the ETA if it makes sense
857 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 858 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 859 else
0c508b03 860 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
861
862
75ef8f14
MV
863
864 // build the status str
865 status << "dlstatus:" << i
866 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
867 << ":" << msg
868 << endl;
869 write(fd, status.str().c_str(), status.str().size());
870 }
871
024d1123 872 return true;
b98f2859
AL
873}
874 /*}}}*/
875// AcquireStatus::Start - Called when the download is started /*{{{*/
876// ---------------------------------------------------------------------
877/* We just reset the counters */
878void pkgAcquireStatus::Start()
879{
880 gettimeofday(&Time,0);
881 gettimeofday(&StartTime,0);
882 LastBytes = 0;
883 CurrentCPS = 0;
884 CurrentBytes = 0;
885 TotalBytes = 0;
886 FetchedBytes = 0;
887 ElapsedTime = 0;
d568ed2d
AL
888 TotalItems = 0;
889 CurrentItems = 0;
b98f2859
AL
890}
891 /*}}}*/
a6568219 892// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
893// ---------------------------------------------------------------------
894/* This accurately computes the elapsed time and the total overall CPS. */
895void pkgAcquireStatus::Stop()
896{
897 // Compute the CPS and elapsed time
898 struct timeval NewTime;
899 gettimeofday(&NewTime,0);
900
31a0531d
AL
901 double Delta = NewTime.tv_sec - StartTime.tv_sec +
902 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 903
b98f2859 904 // Compute the CPS value
31a0531d 905 if (Delta < 0.01)
e331f6ed
AL
906 CurrentCPS = 0;
907 else
31a0531d 908 CurrentCPS = FetchedBytes/Delta;
b98f2859 909 LastBytes = CurrentBytes;
dbbc5494 910 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
911}
912 /*}}}*/
913// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
914// ---------------------------------------------------------------------
915/* This is used to get accurate final transfer rate reporting. */
73da43e9 916void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 917{
b98f2859
AL
918 FetchedBytes += Size - Resume;
919}
920 /*}}}*/