]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Merge branch 'debian/sid' into debian/experimental
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
ea542140
DK
40
41#include <apti18n.h>
0118833a
AL
42 /*}}}*/
43
b4fc9b6f
AL
44using namespace std;
45
0118833a
AL
46// Acquire::pkgAcquire - Constructor /*{{{*/
47// ---------------------------------------------------------------------
93bf083d 48/* We grab some runtime state from the configuration space */
5efbd596 49pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 50 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 51 Running(false)
0118833a 52{
1cd1c398 53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
57 QueueMode = QueueAccess;
58}
5efbd596 59pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 62 Running(false)
1cd1c398
DK
63{
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70}
71 /*}}}*/
72// Acquire::Setup - Delayed Constructor /*{{{*/
73// ---------------------------------------------------------------------
74/* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
76bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
77{
78 Log = Progress;
0a8a80e5 79
1cd1c398 80 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
81 string const listDir = _config->FindDir("Dir::State::lists");
82 string const partialListDir = listDir + "partial/";
83 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
84 string const partialArchivesDir = archivesDir + "partial/";
85
7753e468
DK
86 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
87 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
88 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
89
7753e468
DK
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
91 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 92 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
93
94 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
95 return true;
96
97 // Lock the directory this acquire object will work in
98 LockFD = GetLock(flCombine(Lock, "lock"));
99 if (LockFD == -1)
100 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
101
102 return true;
103}
104 /*}}}*/
0118833a
AL
105// Acquire::~pkgAcquire - Destructor /*{{{*/
106// ---------------------------------------------------------------------
93bf083d 107/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
108pkgAcquire::~pkgAcquire()
109{
459681d3 110 Shutdown();
1cd1c398
DK
111
112 if (LockFD != -1)
113 close(LockFD);
114
3b5421b4
AL
115 while (Configs != 0)
116 {
117 MethodConfig *Jnk = Configs;
118 Configs = Configs->Next;
119 delete Jnk;
120 }
281daf46
AL
121}
122 /*}}}*/
8e5fc8f5 123// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
124// ---------------------------------------------------------------------
125/* */
126void pkgAcquire::Shutdown()
127{
f7f0d6c7 128 while (Items.empty() == false)
1b480911
AL
129 {
130 if (Items[0]->Status == Item::StatFetching)
131 Items[0]->Status = Item::StatError;
281daf46 132 delete Items[0];
1b480911 133 }
0a8a80e5
AL
134
135 while (Queues != 0)
136 {
137 Queue *Jnk = Queues;
138 Queues = Queues->Next;
139 delete Jnk;
140 }
0118833a
AL
141}
142 /*}}}*/
143// Acquire::Add - Add a new item /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* This puts an item on the acquire list. This list is mainly for tracking
146 item status */
0118833a
AL
147void pkgAcquire::Add(Item *Itm)
148{
149 Items.push_back(Itm);
150}
151 /*}}}*/
152// Acquire::Remove - Remove a item /*{{{*/
153// ---------------------------------------------------------------------
93bf083d 154/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
155void pkgAcquire::Remove(Item *Itm)
156{
a3eaf954
AL
157 Dequeue(Itm);
158
753b3525 159 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
160 {
161 if (*I == Itm)
b4fc9b6f 162 {
0118833a 163 Items.erase(I);
b4fc9b6f
AL
164 I = Items.begin();
165 }
753b3525 166 else
f7f0d6c7 167 ++I;
8267fe24 168 }
0118833a
AL
169}
170 /*}}}*/
0a8a80e5
AL
171// Acquire::Add - Add a worker /*{{{*/
172// ---------------------------------------------------------------------
93bf083d
AL
173/* A list of workers is kept so that the select loop can direct their FD
174 usage. */
0a8a80e5
AL
175void pkgAcquire::Add(Worker *Work)
176{
177 Work->NextAcquire = Workers;
178 Workers = Work;
179}
180 /*}}}*/
181// Acquire::Remove - Remove a worker /*{{{*/
182// ---------------------------------------------------------------------
93bf083d
AL
183/* A worker has died. This can not be done while the select loop is running
184 as it would require that RunFds could handling a changing list state and
1e3f4083 185 it can't.. */
0a8a80e5
AL
186void pkgAcquire::Remove(Worker *Work)
187{
93bf083d
AL
188 if (Running == true)
189 abort();
190
0a8a80e5
AL
191 Worker **I = &Workers;
192 for (; *I != 0;)
193 {
194 if (*I == Work)
195 *I = (*I)->NextAcquire;
196 else
197 I = &(*I)->NextAcquire;
198 }
199}
200 /*}}}*/
0118833a
AL
201// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
202// ---------------------------------------------------------------------
93bf083d 203/* This is the entry point for an item. An item calls this function when
281daf46 204 it is constructed which creates a queue (based on the current queue
93bf083d
AL
205 mode) and puts the item in that queue. If the system is running then
206 the queue might be started. */
8267fe24 207void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 208{
0a8a80e5 209 // Determine which queue to put the item in
e331f6ed
AL
210 const MethodConfig *Config;
211 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
212 if (Name.empty() == true)
213 return;
214
215 // Find the queue structure
216 Queue *I = Queues;
217 for (; I != 0 && I->Name != Name; I = I->Next);
218 if (I == 0)
219 {
220 I = new Queue(Name,this);
221 I->Next = Queues;
222 Queues = I;
93bf083d
AL
223
224 if (Running == true)
225 I->Startup();
0a8a80e5 226 }
bfd22fc0 227
e331f6ed
AL
228 // See if this is a local only URI
229 if (Config->LocalOnly == true && Item.Owner->Complete == false)
230 Item.Owner->Local = true;
8267fe24 231 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
232
233 // Queue it into the named queue
c03462c6
MV
234 if(I->Enqueue(Item))
235 ToFetch++;
236
0a8a80e5
AL
237 // Some trace stuff
238 if (Debug == true)
239 {
8267fe24
AL
240 clog << "Fetching " << Item.URI << endl;
241 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 242 clog << " Queue is: " << Name << endl;
0a8a80e5 243 }
3b5421b4
AL
244}
245 /*}}}*/
0a8a80e5 246// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 247// ---------------------------------------------------------------------
93bf083d
AL
248/* This is called when an item is finished being fetched. It removes it
249 from all the queues */
0a8a80e5
AL
250void pkgAcquire::Dequeue(Item *Itm)
251{
252 Queue *I = Queues;
bfd22fc0 253 bool Res = false;
93bf083d
AL
254 if (Debug == true)
255 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
256
257 for (; I != 0; I = I->Next)
258 {
259 if (I->Dequeue(Itm))
260 {
261 Res = true;
262 if (Debug == true)
263 clog << "Dequeued from " << I->Name << endl;
264 }
265 }
266
bfd22fc0
AL
267 if (Res == true)
268 ToFetch--;
0a8a80e5
AL
269}
270 /*}}}*/
271// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
272// ---------------------------------------------------------------------
273/* The string returned depends on the configuration settings and the
274 method parameters. Given something like http://foo.org/bar it can
275 return http://foo.org or http */
e331f6ed 276string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 277{
93bf083d
AL
278 URI U(Uri);
279
e331f6ed 280 Config = GetConfig(U.Access);
0a8a80e5
AL
281 if (Config == 0)
282 return string();
283
284 /* Single-Instance methods get exactly one queue per URI. This is
285 also used for the Access queue method */
286 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
287 return U.Access;
288
289 string AccessSchema = U.Access + ':',
290 FullQueueName = AccessSchema + U.Host;
291 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
292
293 Queue *I = Queues;
294 for (; I != 0; I = I->Next) {
295 // if the queue already exists, re-use it
296 if (I->Name == FullQueueName)
297 return FullQueueName;
298
299 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
300 Instances++;
301 }
302
303 if (Debug) {
304 clog << "Found " << Instances << " instances of " << U.Access << endl;
305 }
306
307 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
308 return U.Access;
93bf083d 309
5674f6b3 310 return FullQueueName;
0118833a
AL
311}
312 /*}}}*/
3b5421b4
AL
313// Acquire::GetConfig - Fetch the configuration information /*{{{*/
314// ---------------------------------------------------------------------
315/* This locates the configuration structure for an access method. If
316 a config structure cannot be found a Worker will be created to
317 retrieve it */
0a8a80e5 318pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
319{
320 // Search for an existing config
321 MethodConfig *Conf;
322 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
323 if (Conf->Access == Access)
324 return Conf;
325
326 // Create the new config class
327 Conf = new MethodConfig;
328 Conf->Access = Access;
329 Conf->Next = Configs;
330 Configs = Conf;
0118833a 331
3b5421b4
AL
332 // Create the worker to fetch the configuration
333 Worker Work(Conf);
334 if (Work.Start() == false)
335 return 0;
7c6e2dc7
MV
336
337 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 338 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
339 Conf->SingleInstance = true;
340
3b5421b4
AL
341 return Conf;
342}
343 /*}}}*/
0a8a80e5
AL
344// Acquire::SetFds - Deal with readable FDs /*{{{*/
345// ---------------------------------------------------------------------
346/* Collect FDs that have activity monitors into the fd sets */
347void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
348{
349 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
350 {
351 if (I->InReady == true && I->InFd >= 0)
352 {
353 if (Fd < I->InFd)
354 Fd = I->InFd;
355 FD_SET(I->InFd,RSet);
356 }
357 if (I->OutReady == true && I->OutFd >= 0)
358 {
359 if (Fd < I->OutFd)
360 Fd = I->OutFd;
361 FD_SET(I->OutFd,WSet);
362 }
363 }
364}
365 /*}}}*/
366// Acquire::RunFds - Deal with active FDs /*{{{*/
367// ---------------------------------------------------------------------
93bf083d
AL
368/* Dispatch active FDs over to the proper workers. It is very important
369 that a worker never be erased while this is running! The queue class
370 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
371void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
372{
373 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
374 {
375 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
376 I->InFdReady();
377 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
378 I->OutFdReady();
379 }
380}
381 /*}}}*/
382// Acquire::Run - Run the fetch sequence /*{{{*/
383// ---------------------------------------------------------------------
384/* This runs the queues. It manages a select loop for all of the
385 Worker tasks. The workers interact with the queues and items to
386 manage the actual fetch. */
1c5f7e5f 387pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 388{
8b89e57f
AL
389 Running = true;
390
0a8a80e5
AL
391 for (Queue *I = Queues; I != 0; I = I->Next)
392 I->Startup();
393
b98f2859
AL
394 if (Log != 0)
395 Log->Start();
396
024d1123
AL
397 bool WasCancelled = false;
398
0a8a80e5 399 // Run till all things have been acquired
8267fe24
AL
400 struct timeval tv;
401 tv.tv_sec = 0;
1c5f7e5f 402 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
403 while (ToFetch > 0)
404 {
405 fd_set RFds;
406 fd_set WFds;
407 int Highest = 0;
408 FD_ZERO(&RFds);
409 FD_ZERO(&WFds);
410 SetFds(Highest,&RFds,&WFds);
411
b0db36b1
AL
412 int Res;
413 do
414 {
415 Res = select(Highest+1,&RFds,&WFds,0,&tv);
416 }
417 while (Res < 0 && errno == EINTR);
418
8267fe24 419 if (Res < 0)
8b89e57f 420 {
8267fe24
AL
421 _error->Errno("select","Select has failed");
422 break;
8b89e57f 423 }
93bf083d 424
0a8a80e5 425 RunFds(&RFds,&WFds);
93bf083d
AL
426 if (_error->PendingError() == true)
427 break;
8267fe24
AL
428
429 // Timeout, notify the log class
430 if (Res == 0 || (Log != 0 && Log->Update == true))
431 {
1c5f7e5f 432 tv.tv_usec = PulseIntervall;
8267fe24
AL
433 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
434 I->Pulse();
024d1123
AL
435 if (Log != 0 && Log->Pulse(this) == false)
436 {
437 WasCancelled = true;
438 break;
439 }
8267fe24 440 }
0a8a80e5 441 }
be4401bf 442
b98f2859
AL
443 if (Log != 0)
444 Log->Stop();
445
be4401bf
AL
446 // Shut down the acquire bits
447 Running = false;
0a8a80e5 448 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 449 I->Shutdown(false);
0a8a80e5 450
ab559b35 451 // Shut down the items
f7f0d6c7 452 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 453 (*I)->Finished();
ab559b35 454
024d1123
AL
455 if (_error->PendingError())
456 return Failed;
457 if (WasCancelled)
458 return Cancelled;
459 return Continue;
93bf083d
AL
460}
461 /*}}}*/
be4401bf 462// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
463// ---------------------------------------------------------------------
464/* This routine bumps idle queues in hopes that they will be able to fetch
465 the dequeued item */
466void pkgAcquire::Bump()
467{
be4401bf
AL
468 for (Queue *I = Queues; I != 0; I = I->Next)
469 I->Bump();
0a8a80e5
AL
470}
471 /*}}}*/
8267fe24
AL
472// Acquire::WorkerStep - Step to the next worker /*{{{*/
473// ---------------------------------------------------------------------
474/* Not inlined to advoid including acquire-worker.h */
475pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
476{
477 return I->NextAcquire;
d3e8fbb3 478}
8267fe24 479 /*}}}*/
a6568219 480// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
481// ---------------------------------------------------------------------
482/* This is a bit simplistic, it looks at every file in the dir and sees
483 if it is part of the download set. */
484bool pkgAcquire::Clean(string Dir)
485{
95b5f6c1
DK
486 // non-existing directories are by definition clean…
487 if (DirectoryExists(Dir) == false)
488 return true;
489
7a7fa5f0
AL
490 DIR *D = opendir(Dir.c_str());
491 if (D == 0)
b2e465d6 492 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
493
494 string StartDir = SafeGetCWD();
495 if (chdir(Dir.c_str()) != 0)
496 {
497 closedir(D);
b2e465d6 498 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
499 }
500
501 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
502 {
503 // Skip some files..
504 if (strcmp(Dir->d_name,"lock") == 0 ||
505 strcmp(Dir->d_name,"partial") == 0 ||
506 strcmp(Dir->d_name,".") == 0 ||
507 strcmp(Dir->d_name,"..") == 0)
508 continue;
509
510 // Look in the get list
b4fc9b6f 511 ItemCIterator I = Items.begin();
f7f0d6c7 512 for (; I != Items.end(); ++I)
7a7fa5f0
AL
513 if (flNotDir((*I)->DestFile) == Dir->d_name)
514 break;
515
516 // Nothing found, nuke it
517 if (I == Items.end())
518 unlink(Dir->d_name);
519 };
520
7a7fa5f0 521 closedir(D);
3c8cda8b
MV
522 if (chdir(StartDir.c_str()) != 0)
523 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
524 return true;
525}
526 /*}}}*/
a6568219
AL
527// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
528// ---------------------------------------------------------------------
529/* This is the total number of bytes needed */
a02db58f 530APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 531{
a3c4c81a 532 unsigned long long Total = 0;
f7f0d6c7 533 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
534 Total += (*I)->FileSize;
535 return Total;
536}
537 /*}}}*/
538// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
539// ---------------------------------------------------------------------
540/* This is the number of bytes that is not local */
a02db58f 541APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 542{
a3c4c81a 543 unsigned long long Total = 0;
f7f0d6c7 544 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
545 if ((*I)->Local == false)
546 Total += (*I)->FileSize;
547 return Total;
548}
549 /*}}}*/
6b1ff003
AL
550// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
551// ---------------------------------------------------------------------
552/* This is the number of bytes that is not local */
a02db58f 553APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 554{
a3c4c81a 555 unsigned long long Total = 0;
f7f0d6c7 556 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
557 if ((*I)->Local == false)
558 Total += (*I)->PartialSize;
559 return Total;
560}
92fcbfc1 561 /*}}}*/
8e5fc8f5 562// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
563// ---------------------------------------------------------------------
564/* */
565pkgAcquire::UriIterator pkgAcquire::UriBegin()
566{
567 return UriIterator(Queues);
568}
569 /*}}}*/
8e5fc8f5 570// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
571// ---------------------------------------------------------------------
572/* */
573pkgAcquire::UriIterator pkgAcquire::UriEnd()
574{
575 return UriIterator(0);
576}
577 /*}}}*/
e331f6ed
AL
578// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
579// ---------------------------------------------------------------------
580/* */
581pkgAcquire::MethodConfig::MethodConfig()
582{
583 SingleInstance = false;
e331f6ed
AL
584 Pipeline = false;
585 SendConfig = false;
586 LocalOnly = false;
459681d3 587 Removable = false;
e331f6ed
AL
588 Next = 0;
589}
590 /*}}}*/
0a8a80e5
AL
591// Queue::Queue - Constructor /*{{{*/
592// ---------------------------------------------------------------------
593/* */
594pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
595 Owner(Owner)
596{
597 Items = 0;
598 Next = 0;
599 Workers = 0;
b185acc2
AL
600 MaxPipeDepth = 1;
601 PipeDepth = 0;
0a8a80e5
AL
602}
603 /*}}}*/
604// Queue::~Queue - Destructor /*{{{*/
605// ---------------------------------------------------------------------
606/* */
607pkgAcquire::Queue::~Queue()
608{
8e5fc8f5 609 Shutdown(true);
0a8a80e5
AL
610
611 while (Items != 0)
612 {
613 QItem *Jnk = Items;
614 Items = Items->Next;
615 delete Jnk;
616 }
617}
618 /*}}}*/
619// Queue::Enqueue - Queue an item to the queue /*{{{*/
620// ---------------------------------------------------------------------
621/* */
c03462c6 622bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 623{
7a1b1f8b 624 QItem **I = &Items;
c03462c6
MV
625 // move to the end of the queue and check for duplicates here
626 for (; *I != 0; I = &(*I)->Next)
627 if (Item.URI == (*I)->URI)
628 {
629 Item.Owner->Status = Item::StatDone;
630 return false;
631 }
632
0a8a80e5 633 // Create a new item
7a1b1f8b
AL
634 QItem *Itm = new QItem;
635 *Itm = Item;
636 Itm->Next = 0;
637 *I = Itm;
0a8a80e5 638
8267fe24 639 Item.Owner->QueueCounter++;
93bf083d
AL
640 if (Items->Next == 0)
641 Cycle();
c03462c6 642 return true;
0a8a80e5
AL
643}
644 /*}}}*/
c88edf1d 645// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 646// ---------------------------------------------------------------------
b185acc2 647/* We return true if we hit something */
bfd22fc0 648bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 649{
b185acc2
AL
650 if (Owner->Status == pkgAcquire::Item::StatFetching)
651 return _error->Error("Tried to dequeue a fetching object");
652
bfd22fc0
AL
653 bool Res = false;
654
0a8a80e5
AL
655 QItem **I = &Items;
656 for (; *I != 0;)
657 {
658 if ((*I)->Owner == Owner)
659 {
660 QItem *Jnk= *I;
661 *I = (*I)->Next;
662 Owner->QueueCounter--;
663 delete Jnk;
bfd22fc0 664 Res = true;
0a8a80e5
AL
665 }
666 else
667 I = &(*I)->Next;
668 }
bfd22fc0
AL
669
670 return Res;
0a8a80e5
AL
671}
672 /*}}}*/
673// Queue::Startup - Start the worker processes /*{{{*/
674// ---------------------------------------------------------------------
8e5fc8f5
AL
675/* It is possible for this to be called with a pre-existing set of
676 workers. */
0a8a80e5
AL
677bool pkgAcquire::Queue::Startup()
678{
8e5fc8f5
AL
679 if (Workers == 0)
680 {
681 URI U(Name);
682 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
683 if (Cnf == 0)
684 return false;
685
686 Workers = new Worker(this,Cnf,Owner->Log);
687 Owner->Add(Workers);
688 if (Workers->Start() == false)
689 return false;
690
691 /* When pipelining we commit 10 items. This needs to change when we
692 added other source retry to have cycle maintain a pipeline depth
693 on its own. */
694 if (Cnf->Pipeline == true)
6ce72612 695 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
696 else
697 MaxPipeDepth = 1;
698 }
5cb5d8dc 699
93bf083d 700 return Cycle();
0a8a80e5
AL
701}
702 /*}}}*/
703// Queue::Shutdown - Shutdown the worker processes /*{{{*/
704// ---------------------------------------------------------------------
8e5fc8f5
AL
705/* If final is true then all workers are eliminated, otherwise only workers
706 that do not need cleanup are removed */
707bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
708{
709 // Delete all of the workers
8e5fc8f5
AL
710 pkgAcquire::Worker **Cur = &Workers;
711 while (*Cur != 0)
0a8a80e5 712 {
8e5fc8f5
AL
713 pkgAcquire::Worker *Jnk = *Cur;
714 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
715 {
716 *Cur = Jnk->NextQueue;
717 Owner->Remove(Jnk);
718 delete Jnk;
719 }
720 else
721 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
722 }
723
724 return true;
3b5421b4
AL
725}
726 /*}}}*/
7d8afa39 727// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
728// ---------------------------------------------------------------------
729/* */
730pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
731{
732 for (QItem *I = Items; I != 0; I = I->Next)
733 if (I->URI == URI && I->Worker == Owner)
734 return I;
735 return 0;
736}
737 /*}}}*/
738// Queue::ItemDone - Item has been completed /*{{{*/
739// ---------------------------------------------------------------------
740/* The worker signals this which causes the item to be removed from the
93bf083d
AL
741 queue. If this is the last queue instance then it is removed from the
742 main queue too.*/
c88edf1d
AL
743bool pkgAcquire::Queue::ItemDone(QItem *Itm)
744{
b185acc2 745 PipeDepth--;
db890fdb
AL
746 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
747 Itm->Owner->Status = pkgAcquire::Item::StatDone;
748
93bf083d
AL
749 if (Itm->Owner->QueueCounter <= 1)
750 Owner->Dequeue(Itm->Owner);
751 else
752 {
753 Dequeue(Itm->Owner);
754 Owner->Bump();
755 }
c88edf1d 756
93bf083d
AL
757 return Cycle();
758}
759 /*}}}*/
760// Queue::Cycle - Queue new items into the method /*{{{*/
761// ---------------------------------------------------------------------
b185acc2
AL
762/* This locates a new idle item and sends it to the worker. If pipelining
763 is enabled then it keeps the pipe full. */
93bf083d
AL
764bool pkgAcquire::Queue::Cycle()
765{
766 if (Items == 0 || Workers == 0)
c88edf1d
AL
767 return true;
768
e7432370
AL
769 if (PipeDepth < 0)
770 return _error->Error("Pipedepth failure");
771
93bf083d
AL
772 // Look for a queable item
773 QItem *I = Items;
e7432370 774 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
775 {
776 for (; I != 0; I = I->Next)
777 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
778 break;
779
780 // Nothing to do, queue is idle.
781 if (I == 0)
782 return true;
783
784 I->Worker = Workers;
785 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 786 PipeDepth++;
b185acc2
AL
787 if (Workers->QueueItem(I) == false)
788 return false;
789 }
93bf083d 790
b185acc2 791 return true;
c88edf1d
AL
792}
793 /*}}}*/
be4401bf
AL
794// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
795// ---------------------------------------------------------------------
b185acc2 796/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
797void pkgAcquire::Queue::Bump()
798{
b185acc2 799 Cycle();
be4401bf
AL
800}
801 /*}}}*/
b98f2859
AL
802// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
803// ---------------------------------------------------------------------
804/* */
dcaa1185 805pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
806{
807 Start();
808}
809 /*}}}*/
810// AcquireStatus::Pulse - Called periodically /*{{{*/
811// ---------------------------------------------------------------------
812/* This computes some internal state variables for the derived classes to
813 use. It generates the current downloaded bytes and total bytes to download
814 as well as the current CPS estimate. */
024d1123 815bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
816{
817 TotalBytes = 0;
818 CurrentBytes = 0;
d568ed2d
AL
819 TotalItems = 0;
820 CurrentItems = 0;
b98f2859
AL
821
822 // Compute the total number of bytes to fetch
823 unsigned int Unknown = 0;
824 unsigned int Count = 0;
c6e9cc58
MV
825 bool UnfetchedReleaseFiles = false;
826 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
827 I != Owner->ItemsEnd();
f7f0d6c7 828 ++I, ++Count)
b98f2859 829 {
d568ed2d
AL
830 TotalItems++;
831 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 832 ++CurrentItems;
d568ed2d 833
a6568219
AL
834 // Totally ignore local items
835 if ((*I)->Local == true)
836 continue;
b2e465d6 837
d0cfa8ad
MV
838 // see if the method tells us to expect more
839 TotalItems += (*I)->ExpectedAdditionalItems;
840
c6e9cc58
MV
841 // check if there are unfetched Release files
842 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
843 UnfetchedReleaseFiles = true;
844
b98f2859
AL
845 TotalBytes += (*I)->FileSize;
846 if ((*I)->Complete == true)
847 CurrentBytes += (*I)->FileSize;
848 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 849 ++Unknown;
b98f2859
AL
850 }
851
852 // Compute the current completion
dbbc5494 853 unsigned long long ResumeSize = 0;
b98f2859
AL
854 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
855 I = Owner->WorkerStep(I))
c62f7898 856 {
b98f2859 857 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
858 {
859 CurrentBytes += I->CurrentSize;
860 ResumeSize += I->ResumePoint;
861
862 // Files with unknown size always have 100% completion
863 if (I->CurrentItem->Owner->FileSize == 0 &&
864 I->CurrentItem->Owner->Complete == false)
865 TotalBytes += I->CurrentSize;
866 }
c62f7898 867 }
aa0e1101 868
b98f2859
AL
869 // Normalize the figures and account for unknown size downloads
870 if (TotalBytes <= 0)
871 TotalBytes = 1;
872 if (Unknown == Count)
873 TotalBytes = Unknown;
18ef0a78
AL
874
875 // Wha?! Is not supposed to happen.
876 if (CurrentBytes > TotalBytes)
877 CurrentBytes = TotalBytes;
96c6cab1
MV
878
879 // debug
880 if (_config->FindB("Debug::acquire::progress", false) == true)
881 std::clog << " Bytes: "
882 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
883 << std::endl;
b98f2859
AL
884
885 // Compute the CPS
886 struct timeval NewTime;
887 gettimeofday(&NewTime,0);
2ec1674d 888 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
889 NewTime.tv_sec - Time.tv_sec > 6)
890 {
f17ac097
AL
891 double Delta = NewTime.tv_sec - Time.tv_sec +
892 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 893
b98f2859 894 // Compute the CPS value
f17ac097 895 if (Delta < 0.01)
e331f6ed
AL
896 CurrentCPS = 0;
897 else
aa0e1101
AL
898 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
899 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 900 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
901 Time = NewTime;
902 }
024d1123 903
c6e9cc58
MV
904 // calculate the percentage, if we have too little data assume 1%
905 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
906 Percent = 0;
907 else
908 // use both files and bytes because bytes can be unreliable
909 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
910 0.2 * (CurrentItems/float(TotalItems)*100.0));
911
75ef8f14
MV
912 int fd = _config->FindI("APT::Status-Fd",-1);
913 if(fd > 0)
914 {
915 ostringstream status;
916
917 char msg[200];
918 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
919 unsigned long long ETA = 0;
920 if(CurrentCPS > 0)
921 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 922
1e8b4c0f
MV
923 // only show the ETA if it makes sense
924 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 925 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 926 else
0c508b03 927 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 928
75ef8f14
MV
929 // build the status str
930 status << "dlstatus:" << i
96c6cab1 931 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
932 << ":" << msg
933 << endl;
31bda500
DK
934
935 std::string const dlstatus = status.str();
d68d65ad 936 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
937 }
938
024d1123 939 return true;
b98f2859
AL
940}
941 /*}}}*/
942// AcquireStatus::Start - Called when the download is started /*{{{*/
943// ---------------------------------------------------------------------
944/* We just reset the counters */
945void pkgAcquireStatus::Start()
946{
947 gettimeofday(&Time,0);
948 gettimeofday(&StartTime,0);
949 LastBytes = 0;
950 CurrentCPS = 0;
951 CurrentBytes = 0;
952 TotalBytes = 0;
953 FetchedBytes = 0;
954 ElapsedTime = 0;
d568ed2d
AL
955 TotalItems = 0;
956 CurrentItems = 0;
b98f2859
AL
957}
958 /*}}}*/
a6568219 959// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
960// ---------------------------------------------------------------------
961/* This accurately computes the elapsed time and the total overall CPS. */
962void pkgAcquireStatus::Stop()
963{
964 // Compute the CPS and elapsed time
965 struct timeval NewTime;
966 gettimeofday(&NewTime,0);
967
31a0531d
AL
968 double Delta = NewTime.tv_sec - StartTime.tv_sec +
969 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 970
b98f2859 971 // Compute the CPS value
31a0531d 972 if (Delta < 0.01)
e331f6ed
AL
973 CurrentCPS = 0;
974 else
31a0531d 975 CurrentCPS = FetchedBytes/Delta;
b98f2859 976 LastBytes = CurrentBytes;
dbbc5494 977 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
978}
979 /*}}}*/
980// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
981// ---------------------------------------------------------------------
982/* This is used to get accurate final transfer rate reporting. */
73da43e9 983void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 984{
b98f2859
AL
985 FetchedBytes += Size - Resume;
986}
987 /*}}}*/