]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
releasing package apt version 1.1~exp3
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
ea542140
DK
40
41#include <apti18n.h>
0118833a
AL
42 /*}}}*/
43
b4fc9b6f
AL
44using namespace std;
45
0118833a
AL
46// Acquire::pkgAcquire - Constructor /*{{{*/
47// ---------------------------------------------------------------------
93bf083d 48/* We grab some runtime state from the configuration space */
5efbd596 49pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 50 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 51 Running(false)
0118833a 52{
1cd1c398 53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
57 QueueMode = QueueAccess;
58}
5efbd596 59pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 62 Running(false)
1cd1c398
DK
63{
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70}
71 /*}}}*/
72// Acquire::Setup - Delayed Constructor /*{{{*/
73// ---------------------------------------------------------------------
74/* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
76bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
77{
78 Log = Progress;
0a8a80e5 79
1cd1c398 80 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
81 string const listDir = _config->FindDir("Dir::State::lists");
82 string const partialListDir = listDir + "partial/";
83 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
84 string const partialArchivesDir = archivesDir + "partial/";
85
7753e468
DK
86 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
87 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
88 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
89
7753e468
DK
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
91 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 92 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
93
94 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
95 return true;
96
97 // Lock the directory this acquire object will work in
98 LockFD = GetLock(flCombine(Lock, "lock"));
99 if (LockFD == -1)
100 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
101
102 return true;
103}
104 /*}}}*/
0118833a
AL
105// Acquire::~pkgAcquire - Destructor /*{{{*/
106// ---------------------------------------------------------------------
93bf083d 107/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
108pkgAcquire::~pkgAcquire()
109{
459681d3 110 Shutdown();
1cd1c398
DK
111
112 if (LockFD != -1)
113 close(LockFD);
114
3b5421b4
AL
115 while (Configs != 0)
116 {
117 MethodConfig *Jnk = Configs;
118 Configs = Configs->Next;
119 delete Jnk;
120 }
281daf46
AL
121}
122 /*}}}*/
8e5fc8f5 123// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
124// ---------------------------------------------------------------------
125/* */
126void pkgAcquire::Shutdown()
127{
f7f0d6c7 128 while (Items.empty() == false)
1b480911
AL
129 {
130 if (Items[0]->Status == Item::StatFetching)
131 Items[0]->Status = Item::StatError;
281daf46 132 delete Items[0];
1b480911 133 }
0a8a80e5
AL
134
135 while (Queues != 0)
136 {
137 Queue *Jnk = Queues;
138 Queues = Queues->Next;
139 delete Jnk;
140 }
0118833a
AL
141}
142 /*}}}*/
143// Acquire::Add - Add a new item /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* This puts an item on the acquire list. This list is mainly for tracking
146 item status */
0118833a
AL
147void pkgAcquire::Add(Item *Itm)
148{
149 Items.push_back(Itm);
150}
151 /*}}}*/
152// Acquire::Remove - Remove a item /*{{{*/
153// ---------------------------------------------------------------------
93bf083d 154/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
155void pkgAcquire::Remove(Item *Itm)
156{
a3eaf954
AL
157 Dequeue(Itm);
158
753b3525 159 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
160 {
161 if (*I == Itm)
b4fc9b6f 162 {
0118833a 163 Items.erase(I);
b4fc9b6f
AL
164 I = Items.begin();
165 }
753b3525 166 else
f7f0d6c7 167 ++I;
8267fe24 168 }
0118833a
AL
169}
170 /*}}}*/
0a8a80e5
AL
171// Acquire::Add - Add a worker /*{{{*/
172// ---------------------------------------------------------------------
93bf083d
AL
173/* A list of workers is kept so that the select loop can direct their FD
174 usage. */
0a8a80e5
AL
175void pkgAcquire::Add(Worker *Work)
176{
177 Work->NextAcquire = Workers;
178 Workers = Work;
179}
180 /*}}}*/
181// Acquire::Remove - Remove a worker /*{{{*/
182// ---------------------------------------------------------------------
93bf083d
AL
183/* A worker has died. This can not be done while the select loop is running
184 as it would require that RunFds could handling a changing list state and
1e3f4083 185 it can't.. */
0a8a80e5
AL
186void pkgAcquire::Remove(Worker *Work)
187{
93bf083d
AL
188 if (Running == true)
189 abort();
190
0a8a80e5
AL
191 Worker **I = &Workers;
192 for (; *I != 0;)
193 {
194 if (*I == Work)
195 *I = (*I)->NextAcquire;
196 else
197 I = &(*I)->NextAcquire;
198 }
199}
200 /*}}}*/
0118833a
AL
201// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
202// ---------------------------------------------------------------------
93bf083d 203/* This is the entry point for an item. An item calls this function when
281daf46 204 it is constructed which creates a queue (based on the current queue
93bf083d
AL
205 mode) and puts the item in that queue. If the system is running then
206 the queue might be started. */
8267fe24 207void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 208{
0a8a80e5 209 // Determine which queue to put the item in
e331f6ed
AL
210 const MethodConfig *Config;
211 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
212 if (Name.empty() == true)
213 return;
214
215 // Find the queue structure
216 Queue *I = Queues;
217 for (; I != 0 && I->Name != Name; I = I->Next);
218 if (I == 0)
219 {
220 I = new Queue(Name,this);
221 I->Next = Queues;
222 Queues = I;
93bf083d
AL
223
224 if (Running == true)
225 I->Startup();
0a8a80e5 226 }
bfd22fc0 227
e331f6ed
AL
228 // See if this is a local only URI
229 if (Config->LocalOnly == true && Item.Owner->Complete == false)
230 Item.Owner->Local = true;
8267fe24 231 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
232
233 // Queue it into the named queue
c03462c6
MV
234 if(I->Enqueue(Item))
235 ToFetch++;
236
0a8a80e5
AL
237 // Some trace stuff
238 if (Debug == true)
239 {
8267fe24
AL
240 clog << "Fetching " << Item.URI << endl;
241 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 242 clog << " Queue is: " << Name << endl;
0a8a80e5 243 }
3b5421b4
AL
244}
245 /*}}}*/
0a8a80e5 246// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 247// ---------------------------------------------------------------------
93bf083d
AL
248/* This is called when an item is finished being fetched. It removes it
249 from all the queues */
0a8a80e5
AL
250void pkgAcquire::Dequeue(Item *Itm)
251{
252 Queue *I = Queues;
bfd22fc0 253 bool Res = false;
93bf083d
AL
254 if (Debug == true)
255 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
256
257 for (; I != 0; I = I->Next)
258 {
259 if (I->Dequeue(Itm))
260 {
261 Res = true;
262 if (Debug == true)
263 clog << "Dequeued from " << I->Name << endl;
264 }
265 }
266
bfd22fc0
AL
267 if (Res == true)
268 ToFetch--;
0a8a80e5
AL
269}
270 /*}}}*/
271// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
272// ---------------------------------------------------------------------
273/* The string returned depends on the configuration settings and the
274 method parameters. Given something like http://foo.org/bar it can
275 return http://foo.org or http */
e331f6ed 276string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 277{
93bf083d
AL
278 URI U(Uri);
279
e331f6ed 280 Config = GetConfig(U.Access);
0a8a80e5
AL
281 if (Config == 0)
282 return string();
283
284 /* Single-Instance methods get exactly one queue per URI. This is
285 also used for the Access queue method */
286 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
287 return U.Access;
288
289 string AccessSchema = U.Access + ':',
290 FullQueueName = AccessSchema + U.Host;
291 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
292
293 Queue *I = Queues;
294 for (; I != 0; I = I->Next) {
295 // if the queue already exists, re-use it
296 if (I->Name == FullQueueName)
297 return FullQueueName;
298
299 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
300 Instances++;
301 }
302
303 if (Debug) {
304 clog << "Found " << Instances << " instances of " << U.Access << endl;
305 }
306
307 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
308 return U.Access;
93bf083d 309
5674f6b3 310 return FullQueueName;
0118833a
AL
311}
312 /*}}}*/
3b5421b4
AL
313// Acquire::GetConfig - Fetch the configuration information /*{{{*/
314// ---------------------------------------------------------------------
315/* This locates the configuration structure for an access method. If
316 a config structure cannot be found a Worker will be created to
317 retrieve it */
0a8a80e5 318pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
319{
320 // Search for an existing config
321 MethodConfig *Conf;
322 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
323 if (Conf->Access == Access)
324 return Conf;
325
326 // Create the new config class
327 Conf = new MethodConfig;
328 Conf->Access = Access;
329 Conf->Next = Configs;
330 Configs = Conf;
0118833a 331
3b5421b4
AL
332 // Create the worker to fetch the configuration
333 Worker Work(Conf);
334 if (Work.Start() == false)
335 return 0;
7c6e2dc7
MV
336
337 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 338 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
339 Conf->SingleInstance = true;
340
3b5421b4
AL
341 return Conf;
342}
343 /*}}}*/
0a8a80e5
AL
344// Acquire::SetFds - Deal with readable FDs /*{{{*/
345// ---------------------------------------------------------------------
346/* Collect FDs that have activity monitors into the fd sets */
347void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
348{
349 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
350 {
351 if (I->InReady == true && I->InFd >= 0)
352 {
353 if (Fd < I->InFd)
354 Fd = I->InFd;
355 FD_SET(I->InFd,RSet);
356 }
357 if (I->OutReady == true && I->OutFd >= 0)
358 {
359 if (Fd < I->OutFd)
360 Fd = I->OutFd;
361 FD_SET(I->OutFd,WSet);
362 }
363 }
364}
365 /*}}}*/
366// Acquire::RunFds - Deal with active FDs /*{{{*/
367// ---------------------------------------------------------------------
93bf083d
AL
368/* Dispatch active FDs over to the proper workers. It is very important
369 that a worker never be erased while this is running! The queue class
370 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
371void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
372{
373 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
374 {
375 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
376 I->InFdReady();
377 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
378 I->OutFdReady();
379 }
380}
381 /*}}}*/
382// Acquire::Run - Run the fetch sequence /*{{{*/
383// ---------------------------------------------------------------------
384/* This runs the queues. It manages a select loop for all of the
385 Worker tasks. The workers interact with the queues and items to
386 manage the actual fetch. */
1c5f7e5f 387pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 388{
8b89e57f
AL
389 Running = true;
390
0a8a80e5
AL
391 for (Queue *I = Queues; I != 0; I = I->Next)
392 I->Startup();
393
b98f2859
AL
394 if (Log != 0)
395 Log->Start();
396
024d1123
AL
397 bool WasCancelled = false;
398
0a8a80e5 399 // Run till all things have been acquired
8267fe24
AL
400 struct timeval tv;
401 tv.tv_sec = 0;
1c5f7e5f 402 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
403 while (ToFetch > 0)
404 {
405 fd_set RFds;
406 fd_set WFds;
407 int Highest = 0;
408 FD_ZERO(&RFds);
409 FD_ZERO(&WFds);
410 SetFds(Highest,&RFds,&WFds);
411
b0db36b1
AL
412 int Res;
413 do
414 {
415 Res = select(Highest+1,&RFds,&WFds,0,&tv);
416 }
417 while (Res < 0 && errno == EINTR);
418
8267fe24 419 if (Res < 0)
8b89e57f 420 {
8267fe24
AL
421 _error->Errno("select","Select has failed");
422 break;
8b89e57f 423 }
93bf083d 424
0a8a80e5 425 RunFds(&RFds,&WFds);
93bf083d
AL
426 if (_error->PendingError() == true)
427 break;
8267fe24
AL
428
429 // Timeout, notify the log class
430 if (Res == 0 || (Log != 0 && Log->Update == true))
431 {
1c5f7e5f 432 tv.tv_usec = PulseIntervall;
8267fe24
AL
433 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
434 I->Pulse();
024d1123
AL
435 if (Log != 0 && Log->Pulse(this) == false)
436 {
437 WasCancelled = true;
438 break;
439 }
8267fe24 440 }
0a8a80e5 441 }
be4401bf 442
b98f2859
AL
443 if (Log != 0)
444 Log->Stop();
445
be4401bf
AL
446 // Shut down the acquire bits
447 Running = false;
0a8a80e5 448 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 449 I->Shutdown(false);
0a8a80e5 450
ab559b35 451 // Shut down the items
f7f0d6c7 452 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 453 (*I)->Finished();
ab559b35 454
024d1123
AL
455 if (_error->PendingError())
456 return Failed;
457 if (WasCancelled)
458 return Cancelled;
459 return Continue;
93bf083d
AL
460}
461 /*}}}*/
be4401bf 462// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
463// ---------------------------------------------------------------------
464/* This routine bumps idle queues in hopes that they will be able to fetch
465 the dequeued item */
466void pkgAcquire::Bump()
467{
be4401bf
AL
468 for (Queue *I = Queues; I != 0; I = I->Next)
469 I->Bump();
0a8a80e5
AL
470}
471 /*}}}*/
8267fe24
AL
472// Acquire::WorkerStep - Step to the next worker /*{{{*/
473// ---------------------------------------------------------------------
474/* Not inlined to advoid including acquire-worker.h */
475pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
476{
477 return I->NextAcquire;
d3e8fbb3 478}
8267fe24 479 /*}}}*/
a6568219 480// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
481// ---------------------------------------------------------------------
482/* This is a bit simplistic, it looks at every file in the dir and sees
483 if it is part of the download set. */
484bool pkgAcquire::Clean(string Dir)
485{
95b5f6c1
DK
486 // non-existing directories are by definition clean…
487 if (DirectoryExists(Dir) == false)
488 return true;
489
10ecfe4f
MV
490 if(Dir == "/")
491 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
492
7a7fa5f0
AL
493 DIR *D = opendir(Dir.c_str());
494 if (D == 0)
b2e465d6 495 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
496
497 string StartDir = SafeGetCWD();
498 if (chdir(Dir.c_str()) != 0)
499 {
500 closedir(D);
b2e465d6 501 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
502 }
503
504 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
505 {
506 // Skip some files..
507 if (strcmp(Dir->d_name,"lock") == 0 ||
508 strcmp(Dir->d_name,"partial") == 0 ||
509 strcmp(Dir->d_name,".") == 0 ||
510 strcmp(Dir->d_name,"..") == 0)
511 continue;
512
513 // Look in the get list
b4fc9b6f 514 ItemCIterator I = Items.begin();
f7f0d6c7 515 for (; I != Items.end(); ++I)
7a7fa5f0
AL
516 if (flNotDir((*I)->DestFile) == Dir->d_name)
517 break;
518
519 // Nothing found, nuke it
520 if (I == Items.end())
521 unlink(Dir->d_name);
522 };
523
7a7fa5f0 524 closedir(D);
3c8cda8b
MV
525 if (chdir(StartDir.c_str()) != 0)
526 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
527 return true;
528}
529 /*}}}*/
a6568219
AL
530// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
531// ---------------------------------------------------------------------
532/* This is the total number of bytes needed */
a02db58f 533APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 534{
a3c4c81a 535 unsigned long long Total = 0;
f7f0d6c7 536 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
537 Total += (*I)->FileSize;
538 return Total;
539}
540 /*}}}*/
541// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
542// ---------------------------------------------------------------------
543/* This is the number of bytes that is not local */
a02db58f 544APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 545{
a3c4c81a 546 unsigned long long Total = 0;
f7f0d6c7 547 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
548 if ((*I)->Local == false)
549 Total += (*I)->FileSize;
550 return Total;
551}
552 /*}}}*/
6b1ff003
AL
553// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
554// ---------------------------------------------------------------------
555/* This is the number of bytes that is not local */
a02db58f 556APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 557{
a3c4c81a 558 unsigned long long Total = 0;
f7f0d6c7 559 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
560 if ((*I)->Local == false)
561 Total += (*I)->PartialSize;
562 return Total;
563}
92fcbfc1 564 /*}}}*/
8e5fc8f5 565// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
566// ---------------------------------------------------------------------
567/* */
568pkgAcquire::UriIterator pkgAcquire::UriBegin()
569{
570 return UriIterator(Queues);
571}
572 /*}}}*/
8e5fc8f5 573// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
574// ---------------------------------------------------------------------
575/* */
576pkgAcquire::UriIterator pkgAcquire::UriEnd()
577{
578 return UriIterator(0);
579}
580 /*}}}*/
e331f6ed
AL
581// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
582// ---------------------------------------------------------------------
583/* */
584pkgAcquire::MethodConfig::MethodConfig()
585{
586 SingleInstance = false;
e331f6ed
AL
587 Pipeline = false;
588 SendConfig = false;
589 LocalOnly = false;
459681d3 590 Removable = false;
e331f6ed
AL
591 Next = 0;
592}
593 /*}}}*/
0a8a80e5
AL
594// Queue::Queue - Constructor /*{{{*/
595// ---------------------------------------------------------------------
596/* */
597pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
598 Owner(Owner)
599{
600 Items = 0;
601 Next = 0;
602 Workers = 0;
b185acc2
AL
603 MaxPipeDepth = 1;
604 PipeDepth = 0;
0a8a80e5
AL
605}
606 /*}}}*/
607// Queue::~Queue - Destructor /*{{{*/
608// ---------------------------------------------------------------------
609/* */
610pkgAcquire::Queue::~Queue()
611{
8e5fc8f5 612 Shutdown(true);
0a8a80e5
AL
613
614 while (Items != 0)
615 {
616 QItem *Jnk = Items;
617 Items = Items->Next;
618 delete Jnk;
619 }
620}
621 /*}}}*/
622// Queue::Enqueue - Queue an item to the queue /*{{{*/
623// ---------------------------------------------------------------------
624/* */
c03462c6 625bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 626{
7a1b1f8b 627 QItem **I = &Items;
c03462c6
MV
628 // move to the end of the queue and check for duplicates here
629 for (; *I != 0; I = &(*I)->Next)
630 if (Item.URI == (*I)->URI)
631 {
632 Item.Owner->Status = Item::StatDone;
633 return false;
634 }
635
0a8a80e5 636 // Create a new item
7a1b1f8b
AL
637 QItem *Itm = new QItem;
638 *Itm = Item;
639 Itm->Next = 0;
640 *I = Itm;
0a8a80e5 641
8267fe24 642 Item.Owner->QueueCounter++;
93bf083d
AL
643 if (Items->Next == 0)
644 Cycle();
c03462c6 645 return true;
0a8a80e5
AL
646}
647 /*}}}*/
c88edf1d 648// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 649// ---------------------------------------------------------------------
b185acc2 650/* We return true if we hit something */
bfd22fc0 651bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 652{
b185acc2
AL
653 if (Owner->Status == pkgAcquire::Item::StatFetching)
654 return _error->Error("Tried to dequeue a fetching object");
655
bfd22fc0
AL
656 bool Res = false;
657
0a8a80e5
AL
658 QItem **I = &Items;
659 for (; *I != 0;)
660 {
661 if ((*I)->Owner == Owner)
662 {
663 QItem *Jnk= *I;
664 *I = (*I)->Next;
665 Owner->QueueCounter--;
666 delete Jnk;
bfd22fc0 667 Res = true;
0a8a80e5
AL
668 }
669 else
670 I = &(*I)->Next;
671 }
bfd22fc0
AL
672
673 return Res;
0a8a80e5
AL
674}
675 /*}}}*/
676// Queue::Startup - Start the worker processes /*{{{*/
677// ---------------------------------------------------------------------
8e5fc8f5
AL
678/* It is possible for this to be called with a pre-existing set of
679 workers. */
0a8a80e5
AL
680bool pkgAcquire::Queue::Startup()
681{
8e5fc8f5
AL
682 if (Workers == 0)
683 {
684 URI U(Name);
685 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
686 if (Cnf == 0)
687 return false;
688
689 Workers = new Worker(this,Cnf,Owner->Log);
690 Owner->Add(Workers);
691 if (Workers->Start() == false)
692 return false;
693
694 /* When pipelining we commit 10 items. This needs to change when we
695 added other source retry to have cycle maintain a pipeline depth
696 on its own. */
697 if (Cnf->Pipeline == true)
6ce72612 698 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
699 else
700 MaxPipeDepth = 1;
701 }
5cb5d8dc 702
93bf083d 703 return Cycle();
0a8a80e5
AL
704}
705 /*}}}*/
706// Queue::Shutdown - Shutdown the worker processes /*{{{*/
707// ---------------------------------------------------------------------
8e5fc8f5
AL
708/* If final is true then all workers are eliminated, otherwise only workers
709 that do not need cleanup are removed */
710bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
711{
712 // Delete all of the workers
8e5fc8f5
AL
713 pkgAcquire::Worker **Cur = &Workers;
714 while (*Cur != 0)
0a8a80e5 715 {
8e5fc8f5
AL
716 pkgAcquire::Worker *Jnk = *Cur;
717 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
718 {
719 *Cur = Jnk->NextQueue;
720 Owner->Remove(Jnk);
721 delete Jnk;
722 }
723 else
724 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
725 }
726
727 return true;
3b5421b4
AL
728}
729 /*}}}*/
7d8afa39 730// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
731// ---------------------------------------------------------------------
732/* */
733pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
734{
735 for (QItem *I = Items; I != 0; I = I->Next)
736 if (I->URI == URI && I->Worker == Owner)
737 return I;
738 return 0;
739}
740 /*}}}*/
741// Queue::ItemDone - Item has been completed /*{{{*/
742// ---------------------------------------------------------------------
743/* The worker signals this which causes the item to be removed from the
93bf083d
AL
744 queue. If this is the last queue instance then it is removed from the
745 main queue too.*/
c88edf1d
AL
746bool pkgAcquire::Queue::ItemDone(QItem *Itm)
747{
b185acc2 748 PipeDepth--;
db890fdb
AL
749 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
750 Itm->Owner->Status = pkgAcquire::Item::StatDone;
751
93bf083d
AL
752 if (Itm->Owner->QueueCounter <= 1)
753 Owner->Dequeue(Itm->Owner);
754 else
755 {
756 Dequeue(Itm->Owner);
757 Owner->Bump();
758 }
c88edf1d 759
93bf083d
AL
760 return Cycle();
761}
762 /*}}}*/
763// Queue::Cycle - Queue new items into the method /*{{{*/
764// ---------------------------------------------------------------------
b185acc2
AL
765/* This locates a new idle item and sends it to the worker. If pipelining
766 is enabled then it keeps the pipe full. */
93bf083d
AL
767bool pkgAcquire::Queue::Cycle()
768{
769 if (Items == 0 || Workers == 0)
c88edf1d
AL
770 return true;
771
e7432370
AL
772 if (PipeDepth < 0)
773 return _error->Error("Pipedepth failure");
774
93bf083d
AL
775 // Look for a queable item
776 QItem *I = Items;
e7432370 777 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
778 {
779 for (; I != 0; I = I->Next)
780 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
781 break;
782
783 // Nothing to do, queue is idle.
784 if (I == 0)
785 return true;
786
787 I->Worker = Workers;
788 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 789 PipeDepth++;
b185acc2
AL
790 if (Workers->QueueItem(I) == false)
791 return false;
792 }
93bf083d 793
b185acc2 794 return true;
c88edf1d
AL
795}
796 /*}}}*/
be4401bf
AL
797// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
798// ---------------------------------------------------------------------
b185acc2 799/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
800void pkgAcquire::Queue::Bump()
801{
b185acc2 802 Cycle();
be4401bf
AL
803}
804 /*}}}*/
b98f2859
AL
805// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
806// ---------------------------------------------------------------------
807/* */
dcaa1185 808pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
809{
810 Start();
811}
812 /*}}}*/
813// AcquireStatus::Pulse - Called periodically /*{{{*/
814// ---------------------------------------------------------------------
815/* This computes some internal state variables for the derived classes to
816 use. It generates the current downloaded bytes and total bytes to download
817 as well as the current CPS estimate. */
024d1123 818bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
819{
820 TotalBytes = 0;
821 CurrentBytes = 0;
d568ed2d
AL
822 TotalItems = 0;
823 CurrentItems = 0;
b98f2859
AL
824
825 // Compute the total number of bytes to fetch
826 unsigned int Unknown = 0;
827 unsigned int Count = 0;
c6e9cc58
MV
828 bool UnfetchedReleaseFiles = false;
829 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
830 I != Owner->ItemsEnd();
f7f0d6c7 831 ++I, ++Count)
b98f2859 832 {
d568ed2d
AL
833 TotalItems++;
834 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 835 ++CurrentItems;
d568ed2d 836
a6568219
AL
837 // Totally ignore local items
838 if ((*I)->Local == true)
839 continue;
b2e465d6 840
d0cfa8ad
MV
841 // see if the method tells us to expect more
842 TotalItems += (*I)->ExpectedAdditionalItems;
843
c6e9cc58
MV
844 // check if there are unfetched Release files
845 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
846 UnfetchedReleaseFiles = true;
847
b98f2859
AL
848 TotalBytes += (*I)->FileSize;
849 if ((*I)->Complete == true)
850 CurrentBytes += (*I)->FileSize;
851 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 852 ++Unknown;
b98f2859
AL
853 }
854
855 // Compute the current completion
dbbc5494 856 unsigned long long ResumeSize = 0;
b98f2859
AL
857 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
858 I = Owner->WorkerStep(I))
c62f7898 859 {
b98f2859 860 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
861 {
862 CurrentBytes += I->CurrentSize;
863 ResumeSize += I->ResumePoint;
864
865 // Files with unknown size always have 100% completion
866 if (I->CurrentItem->Owner->FileSize == 0 &&
867 I->CurrentItem->Owner->Complete == false)
868 TotalBytes += I->CurrentSize;
869 }
c62f7898 870 }
aa0e1101 871
b98f2859
AL
872 // Normalize the figures and account for unknown size downloads
873 if (TotalBytes <= 0)
874 TotalBytes = 1;
875 if (Unknown == Count)
876 TotalBytes = Unknown;
18ef0a78
AL
877
878 // Wha?! Is not supposed to happen.
879 if (CurrentBytes > TotalBytes)
880 CurrentBytes = TotalBytes;
96c6cab1
MV
881
882 // debug
883 if (_config->FindB("Debug::acquire::progress", false) == true)
884 std::clog << " Bytes: "
885 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
886 << std::endl;
b98f2859
AL
887
888 // Compute the CPS
889 struct timeval NewTime;
890 gettimeofday(&NewTime,0);
2ec1674d 891 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
892 NewTime.tv_sec - Time.tv_sec > 6)
893 {
f17ac097
AL
894 double Delta = NewTime.tv_sec - Time.tv_sec +
895 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 896
b98f2859 897 // Compute the CPS value
f17ac097 898 if (Delta < 0.01)
e331f6ed
AL
899 CurrentCPS = 0;
900 else
aa0e1101
AL
901 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
902 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 903 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
904 Time = NewTime;
905 }
024d1123 906
c6e9cc58
MV
907 // calculate the percentage, if we have too little data assume 1%
908 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
909 Percent = 0;
910 else
911 // use both files and bytes because bytes can be unreliable
912 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
913 0.2 * (CurrentItems/float(TotalItems)*100.0));
914
75ef8f14
MV
915 int fd = _config->FindI("APT::Status-Fd",-1);
916 if(fd > 0)
917 {
918 ostringstream status;
919
920 char msg[200];
921 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
922 unsigned long long ETA = 0;
923 if(CurrentCPS > 0)
924 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 925
1e8b4c0f
MV
926 // only show the ETA if it makes sense
927 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 928 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 929 else
0c508b03 930 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 931
75ef8f14
MV
932 // build the status str
933 status << "dlstatus:" << i
96c6cab1 934 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
935 << ":" << msg
936 << endl;
31bda500
DK
937
938 std::string const dlstatus = status.str();
d68d65ad 939 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
940 }
941
024d1123 942 return true;
b98f2859
AL
943}
944 /*}}}*/
945// AcquireStatus::Start - Called when the download is started /*{{{*/
946// ---------------------------------------------------------------------
947/* We just reset the counters */
948void pkgAcquireStatus::Start()
949{
950 gettimeofday(&Time,0);
951 gettimeofday(&StartTime,0);
952 LastBytes = 0;
953 CurrentCPS = 0;
954 CurrentBytes = 0;
955 TotalBytes = 0;
956 FetchedBytes = 0;
957 ElapsedTime = 0;
d568ed2d
AL
958 TotalItems = 0;
959 CurrentItems = 0;
b98f2859
AL
960}
961 /*}}}*/
a6568219 962// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
963// ---------------------------------------------------------------------
964/* This accurately computes the elapsed time and the total overall CPS. */
965void pkgAcquireStatus::Stop()
966{
967 // Compute the CPS and elapsed time
968 struct timeval NewTime;
969 gettimeofday(&NewTime,0);
970
31a0531d
AL
971 double Delta = NewTime.tv_sec - StartTime.tv_sec +
972 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 973
b98f2859 974 // Compute the CPS value
31a0531d 975 if (Delta < 0.01)
e331f6ed
AL
976 CurrentCPS = 0;
977 else
31a0531d 978 CurrentCPS = FetchedBytes/Delta;
b98f2859 979 LastBytes = CurrentBytes;
dbbc5494 980 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
981}
982 /*}}}*/
983// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
984// ---------------------------------------------------------------------
985/* This is used to get accurate final transfer rate reporting. */
73da43e9 986void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 987{
b98f2859
AL
988 FetchedBytes += Size - Resume;
989}
990 /*}}}*/