]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Support large files in the complete toolset. Indexes of this
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
b4fc9b6f 26#include <iostream>
75ef8f14 27#include <sstream>
526334a0
MV
28#include <stdio.h>
29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
ea542140
DK
33
34#include <apti18n.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
5efbd596 42pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 43 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 44 Running(false)
0118833a 45{
1cd1c398 46 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
47 if (strcasecmp(Mode.c_str(),"host") == 0)
48 QueueMode = QueueHost;
49 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
50 QueueMode = QueueAccess;
51}
5efbd596 52pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
53 Configs(0), Log(Progress), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 55 Running(false)
1cd1c398
DK
56{
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 Setup(Progress, "");
63}
64 /*}}}*/
65// Acquire::Setup - Delayed Constructor /*{{{*/
66// ---------------------------------------------------------------------
67/* Do everything needed to be a complete Acquire object and report the
68 success (or failure) back so the user knows that something is wrong… */
69bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
70{
71 Log = Progress;
0a8a80e5 72
1cd1c398 73 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
74 string const listDir = _config->FindDir("Dir::State::lists");
75 string const partialListDir = listDir + "partial/";
76 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
77 string const partialArchivesDir = archivesDir + "partial/";
78
7753e468
DK
79 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
80 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
81 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
82
7753e468
DK
83 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
84 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 85 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
86
87 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
88 return true;
89
90 // Lock the directory this acquire object will work in
91 LockFD = GetLock(flCombine(Lock, "lock"));
92 if (LockFD == -1)
93 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
94
95 return true;
96}
97 /*}}}*/
0118833a
AL
98// Acquire::~pkgAcquire - Destructor /*{{{*/
99// ---------------------------------------------------------------------
93bf083d 100/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
101pkgAcquire::~pkgAcquire()
102{
459681d3 103 Shutdown();
1cd1c398
DK
104
105 if (LockFD != -1)
106 close(LockFD);
107
3b5421b4
AL
108 while (Configs != 0)
109 {
110 MethodConfig *Jnk = Configs;
111 Configs = Configs->Next;
112 delete Jnk;
113 }
281daf46
AL
114}
115 /*}}}*/
8e5fc8f5 116// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
117// ---------------------------------------------------------------------
118/* */
119void pkgAcquire::Shutdown()
120{
121 while (Items.size() != 0)
1b480911
AL
122 {
123 if (Items[0]->Status == Item::StatFetching)
124 Items[0]->Status = Item::StatError;
281daf46 125 delete Items[0];
1b480911 126 }
0a8a80e5
AL
127
128 while (Queues != 0)
129 {
130 Queue *Jnk = Queues;
131 Queues = Queues->Next;
132 delete Jnk;
133 }
0118833a
AL
134}
135 /*}}}*/
136// Acquire::Add - Add a new item /*{{{*/
137// ---------------------------------------------------------------------
93bf083d
AL
138/* This puts an item on the acquire list. This list is mainly for tracking
139 item status */
0118833a
AL
140void pkgAcquire::Add(Item *Itm)
141{
142 Items.push_back(Itm);
143}
144 /*}}}*/
145// Acquire::Remove - Remove a item /*{{{*/
146// ---------------------------------------------------------------------
93bf083d 147/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
148void pkgAcquire::Remove(Item *Itm)
149{
a3eaf954
AL
150 Dequeue(Itm);
151
753b3525 152 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
153 {
154 if (*I == Itm)
b4fc9b6f 155 {
0118833a 156 Items.erase(I);
b4fc9b6f
AL
157 I = Items.begin();
158 }
753b3525
AL
159 else
160 I++;
8267fe24 161 }
0118833a
AL
162}
163 /*}}}*/
0a8a80e5
AL
164// Acquire::Add - Add a worker /*{{{*/
165// ---------------------------------------------------------------------
93bf083d
AL
166/* A list of workers is kept so that the select loop can direct their FD
167 usage. */
0a8a80e5
AL
168void pkgAcquire::Add(Worker *Work)
169{
170 Work->NextAcquire = Workers;
171 Workers = Work;
172}
173 /*}}}*/
174// Acquire::Remove - Remove a worker /*{{{*/
175// ---------------------------------------------------------------------
93bf083d
AL
176/* A worker has died. This can not be done while the select loop is running
177 as it would require that RunFds could handling a changing list state and
178 it cant.. */
0a8a80e5
AL
179void pkgAcquire::Remove(Worker *Work)
180{
93bf083d
AL
181 if (Running == true)
182 abort();
183
0a8a80e5
AL
184 Worker **I = &Workers;
185 for (; *I != 0;)
186 {
187 if (*I == Work)
188 *I = (*I)->NextAcquire;
189 else
190 I = &(*I)->NextAcquire;
191 }
192}
193 /*}}}*/
0118833a
AL
194// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
195// ---------------------------------------------------------------------
93bf083d 196/* This is the entry point for an item. An item calls this function when
281daf46 197 it is constructed which creates a queue (based on the current queue
93bf083d
AL
198 mode) and puts the item in that queue. If the system is running then
199 the queue might be started. */
8267fe24 200void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 201{
0a8a80e5 202 // Determine which queue to put the item in
e331f6ed
AL
203 const MethodConfig *Config;
204 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
205 if (Name.empty() == true)
206 return;
207
208 // Find the queue structure
209 Queue *I = Queues;
210 for (; I != 0 && I->Name != Name; I = I->Next);
211 if (I == 0)
212 {
213 I = new Queue(Name,this);
214 I->Next = Queues;
215 Queues = I;
93bf083d
AL
216
217 if (Running == true)
218 I->Startup();
0a8a80e5 219 }
bfd22fc0 220
e331f6ed
AL
221 // See if this is a local only URI
222 if (Config->LocalOnly == true && Item.Owner->Complete == false)
223 Item.Owner->Local = true;
8267fe24 224 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
225
226 // Queue it into the named queue
c03462c6
MV
227 if(I->Enqueue(Item))
228 ToFetch++;
229
0a8a80e5
AL
230 // Some trace stuff
231 if (Debug == true)
232 {
8267fe24
AL
233 clog << "Fetching " << Item.URI << endl;
234 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 235 clog << " Queue is: " << Name << endl;
0a8a80e5 236 }
3b5421b4
AL
237}
238 /*}}}*/
0a8a80e5 239// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 240// ---------------------------------------------------------------------
93bf083d
AL
241/* This is called when an item is finished being fetched. It removes it
242 from all the queues */
0a8a80e5
AL
243void pkgAcquire::Dequeue(Item *Itm)
244{
245 Queue *I = Queues;
bfd22fc0 246 bool Res = false;
0a8a80e5 247 for (; I != 0; I = I->Next)
bfd22fc0 248 Res |= I->Dequeue(Itm);
93bf083d
AL
249
250 if (Debug == true)
251 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
252 if (Res == true)
253 ToFetch--;
0a8a80e5
AL
254}
255 /*}}}*/
256// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
257// ---------------------------------------------------------------------
258/* The string returned depends on the configuration settings and the
259 method parameters. Given something like http://foo.org/bar it can
260 return http://foo.org or http */
e331f6ed 261string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 262{
93bf083d
AL
263 URI U(Uri);
264
e331f6ed 265 Config = GetConfig(U.Access);
0a8a80e5
AL
266 if (Config == 0)
267 return string();
268
269 /* Single-Instance methods get exactly one queue per URI. This is
270 also used for the Access queue method */
271 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 272 return U.Access;
93bf083d
AL
273
274 return U.Access + ':' + U.Host;
0118833a
AL
275}
276 /*}}}*/
3b5421b4
AL
277// Acquire::GetConfig - Fetch the configuration information /*{{{*/
278// ---------------------------------------------------------------------
279/* This locates the configuration structure for an access method. If
280 a config structure cannot be found a Worker will be created to
281 retrieve it */
0a8a80e5 282pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
283{
284 // Search for an existing config
285 MethodConfig *Conf;
286 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
287 if (Conf->Access == Access)
288 return Conf;
289
290 // Create the new config class
291 Conf = new MethodConfig;
292 Conf->Access = Access;
293 Conf->Next = Configs;
294 Configs = Conf;
0118833a 295
3b5421b4
AL
296 // Create the worker to fetch the configuration
297 Worker Work(Conf);
298 if (Work.Start() == false)
299 return 0;
7c6e2dc7
MV
300
301 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 302 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
303 Conf->SingleInstance = true;
304
3b5421b4
AL
305 return Conf;
306}
307 /*}}}*/
0a8a80e5
AL
308// Acquire::SetFds - Deal with readable FDs /*{{{*/
309// ---------------------------------------------------------------------
310/* Collect FDs that have activity monitors into the fd sets */
311void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
312{
313 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
314 {
315 if (I->InReady == true && I->InFd >= 0)
316 {
317 if (Fd < I->InFd)
318 Fd = I->InFd;
319 FD_SET(I->InFd,RSet);
320 }
321 if (I->OutReady == true && I->OutFd >= 0)
322 {
323 if (Fd < I->OutFd)
324 Fd = I->OutFd;
325 FD_SET(I->OutFd,WSet);
326 }
327 }
328}
329 /*}}}*/
330// Acquire::RunFds - Deal with active FDs /*{{{*/
331// ---------------------------------------------------------------------
93bf083d
AL
332/* Dispatch active FDs over to the proper workers. It is very important
333 that a worker never be erased while this is running! The queue class
334 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
335void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
336{
337 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
338 {
339 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
340 I->InFdReady();
341 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
342 I->OutFdReady();
343 }
344}
345 /*}}}*/
346// Acquire::Run - Run the fetch sequence /*{{{*/
347// ---------------------------------------------------------------------
348/* This runs the queues. It manages a select loop for all of the
349 Worker tasks. The workers interact with the queues and items to
350 manage the actual fetch. */
1c5f7e5f 351pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 352{
8b89e57f
AL
353 Running = true;
354
0a8a80e5
AL
355 for (Queue *I = Queues; I != 0; I = I->Next)
356 I->Startup();
357
b98f2859
AL
358 if (Log != 0)
359 Log->Start();
360
024d1123
AL
361 bool WasCancelled = false;
362
0a8a80e5 363 // Run till all things have been acquired
8267fe24
AL
364 struct timeval tv;
365 tv.tv_sec = 0;
1c5f7e5f 366 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
367 while (ToFetch > 0)
368 {
369 fd_set RFds;
370 fd_set WFds;
371 int Highest = 0;
372 FD_ZERO(&RFds);
373 FD_ZERO(&WFds);
374 SetFds(Highest,&RFds,&WFds);
375
b0db36b1
AL
376 int Res;
377 do
378 {
379 Res = select(Highest+1,&RFds,&WFds,0,&tv);
380 }
381 while (Res < 0 && errno == EINTR);
382
8267fe24 383 if (Res < 0)
8b89e57f 384 {
8267fe24
AL
385 _error->Errno("select","Select has failed");
386 break;
8b89e57f 387 }
93bf083d 388
0a8a80e5 389 RunFds(&RFds,&WFds);
93bf083d
AL
390 if (_error->PendingError() == true)
391 break;
8267fe24
AL
392
393 // Timeout, notify the log class
394 if (Res == 0 || (Log != 0 && Log->Update == true))
395 {
1c5f7e5f 396 tv.tv_usec = PulseIntervall;
8267fe24
AL
397 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
398 I->Pulse();
024d1123
AL
399 if (Log != 0 && Log->Pulse(this) == false)
400 {
401 WasCancelled = true;
402 break;
403 }
8267fe24 404 }
0a8a80e5 405 }
be4401bf 406
b98f2859
AL
407 if (Log != 0)
408 Log->Stop();
409
be4401bf
AL
410 // Shut down the acquire bits
411 Running = false;
0a8a80e5 412 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 413 I->Shutdown(false);
0a8a80e5 414
ab559b35 415 // Shut down the items
b4fc9b6f 416 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 417 (*I)->Finished();
ab559b35 418
024d1123
AL
419 if (_error->PendingError())
420 return Failed;
421 if (WasCancelled)
422 return Cancelled;
423 return Continue;
93bf083d
AL
424}
425 /*}}}*/
be4401bf 426// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
427// ---------------------------------------------------------------------
428/* This routine bumps idle queues in hopes that they will be able to fetch
429 the dequeued item */
430void pkgAcquire::Bump()
431{
be4401bf
AL
432 for (Queue *I = Queues; I != 0; I = I->Next)
433 I->Bump();
0a8a80e5
AL
434}
435 /*}}}*/
8267fe24
AL
436// Acquire::WorkerStep - Step to the next worker /*{{{*/
437// ---------------------------------------------------------------------
438/* Not inlined to advoid including acquire-worker.h */
439pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
440{
441 return I->NextAcquire;
442};
443 /*}}}*/
a6568219 444// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
445// ---------------------------------------------------------------------
446/* This is a bit simplistic, it looks at every file in the dir and sees
447 if it is part of the download set. */
448bool pkgAcquire::Clean(string Dir)
449{
450 DIR *D = opendir(Dir.c_str());
451 if (D == 0)
b2e465d6 452 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
453
454 string StartDir = SafeGetCWD();
455 if (chdir(Dir.c_str()) != 0)
456 {
457 closedir(D);
b2e465d6 458 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
459 }
460
461 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
462 {
463 // Skip some files..
464 if (strcmp(Dir->d_name,"lock") == 0 ||
465 strcmp(Dir->d_name,"partial") == 0 ||
466 strcmp(Dir->d_name,".") == 0 ||
467 strcmp(Dir->d_name,"..") == 0)
468 continue;
469
470 // Look in the get list
b4fc9b6f 471 ItemCIterator I = Items.begin();
7a7fa5f0
AL
472 for (; I != Items.end(); I++)
473 if (flNotDir((*I)->DestFile) == Dir->d_name)
474 break;
475
476 // Nothing found, nuke it
477 if (I == Items.end())
478 unlink(Dir->d_name);
479 };
480
7a7fa5f0 481 closedir(D);
3c8cda8b
MV
482 if (chdir(StartDir.c_str()) != 0)
483 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
484 return true;
485}
486 /*}}}*/
a6568219
AL
487// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
488// ---------------------------------------------------------------------
489/* This is the total number of bytes needed */
a3c4c81a 490unsigned long long pkgAcquire::TotalNeeded()
a6568219 491{
a3c4c81a 492 unsigned long long Total = 0;
b4fc9b6f 493 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
494 Total += (*I)->FileSize;
495 return Total;
496}
497 /*}}}*/
498// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
499// ---------------------------------------------------------------------
500/* This is the number of bytes that is not local */
a3c4c81a 501unsigned long long pkgAcquire::FetchNeeded()
a6568219 502{
a3c4c81a 503 unsigned long long Total = 0;
b4fc9b6f 504 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
505 if ((*I)->Local == false)
506 Total += (*I)->FileSize;
507 return Total;
508}
509 /*}}}*/
6b1ff003
AL
510// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
511// ---------------------------------------------------------------------
512/* This is the number of bytes that is not local */
a3c4c81a 513unsigned long long pkgAcquire::PartialPresent()
6b1ff003 514{
a3c4c81a 515 unsigned long long Total = 0;
b4fc9b6f 516 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
517 if ((*I)->Local == false)
518 Total += (*I)->PartialSize;
519 return Total;
520}
92fcbfc1 521 /*}}}*/
8e5fc8f5 522// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
523// ---------------------------------------------------------------------
524/* */
525pkgAcquire::UriIterator pkgAcquire::UriBegin()
526{
527 return UriIterator(Queues);
528}
529 /*}}}*/
8e5fc8f5 530// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
531// ---------------------------------------------------------------------
532/* */
533pkgAcquire::UriIterator pkgAcquire::UriEnd()
534{
535 return UriIterator(0);
536}
537 /*}}}*/
e331f6ed
AL
538// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
539// ---------------------------------------------------------------------
540/* */
541pkgAcquire::MethodConfig::MethodConfig()
542{
543 SingleInstance = false;
e331f6ed
AL
544 Pipeline = false;
545 SendConfig = false;
546 LocalOnly = false;
459681d3 547 Removable = false;
e331f6ed
AL
548 Next = 0;
549}
550 /*}}}*/
0a8a80e5
AL
551// Queue::Queue - Constructor /*{{{*/
552// ---------------------------------------------------------------------
553/* */
554pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
555 Owner(Owner)
556{
557 Items = 0;
558 Next = 0;
559 Workers = 0;
b185acc2
AL
560 MaxPipeDepth = 1;
561 PipeDepth = 0;
0a8a80e5
AL
562}
563 /*}}}*/
564// Queue::~Queue - Destructor /*{{{*/
565// ---------------------------------------------------------------------
566/* */
567pkgAcquire::Queue::~Queue()
568{
8e5fc8f5 569 Shutdown(true);
0a8a80e5
AL
570
571 while (Items != 0)
572 {
573 QItem *Jnk = Items;
574 Items = Items->Next;
575 delete Jnk;
576 }
577}
578 /*}}}*/
579// Queue::Enqueue - Queue an item to the queue /*{{{*/
580// ---------------------------------------------------------------------
581/* */
c03462c6 582bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 583{
7a1b1f8b 584 QItem **I = &Items;
c03462c6
MV
585 // move to the end of the queue and check for duplicates here
586 for (; *I != 0; I = &(*I)->Next)
587 if (Item.URI == (*I)->URI)
588 {
589 Item.Owner->Status = Item::StatDone;
590 return false;
591 }
592
0a8a80e5 593 // Create a new item
7a1b1f8b
AL
594 QItem *Itm = new QItem;
595 *Itm = Item;
596 Itm->Next = 0;
597 *I = Itm;
0a8a80e5 598
8267fe24 599 Item.Owner->QueueCounter++;
93bf083d
AL
600 if (Items->Next == 0)
601 Cycle();
c03462c6 602 return true;
0a8a80e5
AL
603}
604 /*}}}*/
c88edf1d 605// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 606// ---------------------------------------------------------------------
b185acc2 607/* We return true if we hit something */
bfd22fc0 608bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 609{
b185acc2
AL
610 if (Owner->Status == pkgAcquire::Item::StatFetching)
611 return _error->Error("Tried to dequeue a fetching object");
612
bfd22fc0
AL
613 bool Res = false;
614
0a8a80e5
AL
615 QItem **I = &Items;
616 for (; *I != 0;)
617 {
618 if ((*I)->Owner == Owner)
619 {
620 QItem *Jnk= *I;
621 *I = (*I)->Next;
622 Owner->QueueCounter--;
623 delete Jnk;
bfd22fc0 624 Res = true;
0a8a80e5
AL
625 }
626 else
627 I = &(*I)->Next;
628 }
bfd22fc0
AL
629
630 return Res;
0a8a80e5
AL
631}
632 /*}}}*/
633// Queue::Startup - Start the worker processes /*{{{*/
634// ---------------------------------------------------------------------
8e5fc8f5
AL
635/* It is possible for this to be called with a pre-existing set of
636 workers. */
0a8a80e5
AL
637bool pkgAcquire::Queue::Startup()
638{
8e5fc8f5
AL
639 if (Workers == 0)
640 {
641 URI U(Name);
642 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
643 if (Cnf == 0)
644 return false;
645
646 Workers = new Worker(this,Cnf,Owner->Log);
647 Owner->Add(Workers);
648 if (Workers->Start() == false)
649 return false;
650
651 /* When pipelining we commit 10 items. This needs to change when we
652 added other source retry to have cycle maintain a pipeline depth
653 on its own. */
654 if (Cnf->Pipeline == true)
6ce72612 655 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
656 else
657 MaxPipeDepth = 1;
658 }
5cb5d8dc 659
93bf083d 660 return Cycle();
0a8a80e5
AL
661}
662 /*}}}*/
663// Queue::Shutdown - Shutdown the worker processes /*{{{*/
664// ---------------------------------------------------------------------
8e5fc8f5
AL
665/* If final is true then all workers are eliminated, otherwise only workers
666 that do not need cleanup are removed */
667bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
668{
669 // Delete all of the workers
8e5fc8f5
AL
670 pkgAcquire::Worker **Cur = &Workers;
671 while (*Cur != 0)
0a8a80e5 672 {
8e5fc8f5
AL
673 pkgAcquire::Worker *Jnk = *Cur;
674 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
675 {
676 *Cur = Jnk->NextQueue;
677 Owner->Remove(Jnk);
678 delete Jnk;
679 }
680 else
681 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
682 }
683
684 return true;
3b5421b4
AL
685}
686 /*}}}*/
7d8afa39 687// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
688// ---------------------------------------------------------------------
689/* */
690pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
691{
692 for (QItem *I = Items; I != 0; I = I->Next)
693 if (I->URI == URI && I->Worker == Owner)
694 return I;
695 return 0;
696}
697 /*}}}*/
698// Queue::ItemDone - Item has been completed /*{{{*/
699// ---------------------------------------------------------------------
700/* The worker signals this which causes the item to be removed from the
93bf083d
AL
701 queue. If this is the last queue instance then it is removed from the
702 main queue too.*/
c88edf1d
AL
703bool pkgAcquire::Queue::ItemDone(QItem *Itm)
704{
b185acc2 705 PipeDepth--;
db890fdb
AL
706 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
707 Itm->Owner->Status = pkgAcquire::Item::StatDone;
708
93bf083d
AL
709 if (Itm->Owner->QueueCounter <= 1)
710 Owner->Dequeue(Itm->Owner);
711 else
712 {
713 Dequeue(Itm->Owner);
714 Owner->Bump();
715 }
c88edf1d 716
93bf083d
AL
717 return Cycle();
718}
719 /*}}}*/
720// Queue::Cycle - Queue new items into the method /*{{{*/
721// ---------------------------------------------------------------------
b185acc2
AL
722/* This locates a new idle item and sends it to the worker. If pipelining
723 is enabled then it keeps the pipe full. */
93bf083d
AL
724bool pkgAcquire::Queue::Cycle()
725{
726 if (Items == 0 || Workers == 0)
c88edf1d
AL
727 return true;
728
e7432370
AL
729 if (PipeDepth < 0)
730 return _error->Error("Pipedepth failure");
731
93bf083d
AL
732 // Look for a queable item
733 QItem *I = Items;
e7432370 734 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
735 {
736 for (; I != 0; I = I->Next)
737 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
738 break;
739
740 // Nothing to do, queue is idle.
741 if (I == 0)
742 return true;
743
744 I->Worker = Workers;
745 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 746 PipeDepth++;
b185acc2
AL
747 if (Workers->QueueItem(I) == false)
748 return false;
749 }
93bf083d 750
b185acc2 751 return true;
c88edf1d
AL
752}
753 /*}}}*/
be4401bf
AL
754// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
755// ---------------------------------------------------------------------
b185acc2 756/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
757void pkgAcquire::Queue::Bump()
758{
b185acc2 759 Cycle();
be4401bf
AL
760}
761 /*}}}*/
b98f2859
AL
762// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
763// ---------------------------------------------------------------------
764/* */
c5ccf175 765pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
766{
767 Start();
768}
769 /*}}}*/
770// AcquireStatus::Pulse - Called periodically /*{{{*/
771// ---------------------------------------------------------------------
772/* This computes some internal state variables for the derived classes to
773 use. It generates the current downloaded bytes and total bytes to download
774 as well as the current CPS estimate. */
024d1123 775bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
776{
777 TotalBytes = 0;
778 CurrentBytes = 0;
d568ed2d
AL
779 TotalItems = 0;
780 CurrentItems = 0;
b98f2859
AL
781
782 // Compute the total number of bytes to fetch
783 unsigned int Unknown = 0;
784 unsigned int Count = 0;
b4fc9b6f 785 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
786 I++, Count++)
787 {
d568ed2d
AL
788 TotalItems++;
789 if ((*I)->Status == pkgAcquire::Item::StatDone)
790 CurrentItems++;
791
a6568219
AL
792 // Totally ignore local items
793 if ((*I)->Local == true)
794 continue;
b2e465d6 795
b98f2859
AL
796 TotalBytes += (*I)->FileSize;
797 if ((*I)->Complete == true)
798 CurrentBytes += (*I)->FileSize;
799 if ((*I)->FileSize == 0 && (*I)->Complete == false)
800 Unknown++;
801 }
802
803 // Compute the current completion
dbbc5494 804 unsigned long long ResumeSize = 0;
b98f2859
AL
805 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
806 I = Owner->WorkerStep(I))
807 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
808 {
809 CurrentBytes += I->CurrentSize;
810 ResumeSize += I->ResumePoint;
811
812 // Files with unknown size always have 100% completion
813 if (I->CurrentItem->Owner->FileSize == 0 &&
814 I->CurrentItem->Owner->Complete == false)
815 TotalBytes += I->CurrentSize;
816 }
817
b98f2859
AL
818 // Normalize the figures and account for unknown size downloads
819 if (TotalBytes <= 0)
820 TotalBytes = 1;
821 if (Unknown == Count)
822 TotalBytes = Unknown;
18ef0a78
AL
823
824 // Wha?! Is not supposed to happen.
825 if (CurrentBytes > TotalBytes)
826 CurrentBytes = TotalBytes;
b98f2859
AL
827
828 // Compute the CPS
829 struct timeval NewTime;
830 gettimeofday(&NewTime,0);
2ec1674d 831 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
832 NewTime.tv_sec - Time.tv_sec > 6)
833 {
f17ac097
AL
834 double Delta = NewTime.tv_sec - Time.tv_sec +
835 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 836
b98f2859 837 // Compute the CPS value
f17ac097 838 if (Delta < 0.01)
e331f6ed
AL
839 CurrentCPS = 0;
840 else
aa0e1101
AL
841 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
842 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 843 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
844 Time = NewTime;
845 }
024d1123 846
75ef8f14
MV
847 int fd = _config->FindI("APT::Status-Fd",-1);
848 if(fd > 0)
849 {
850 ostringstream status;
851
852 char msg[200];
853 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
dbbc5494 854 unsigned long long const ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 855
1e8b4c0f
MV
856 // only show the ETA if it makes sense
857 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 858 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 859 else
0c508b03 860 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
861
862
75ef8f14
MV
863
864 // build the status str
865 status << "dlstatus:" << i
866 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
867 << ":" << msg
868 << endl;
869 write(fd, status.str().c_str(), status.str().size());
870 }
871
024d1123 872 return true;
b98f2859
AL
873}
874 /*}}}*/
875// AcquireStatus::Start - Called when the download is started /*{{{*/
876// ---------------------------------------------------------------------
877/* We just reset the counters */
878void pkgAcquireStatus::Start()
879{
880 gettimeofday(&Time,0);
881 gettimeofday(&StartTime,0);
882 LastBytes = 0;
883 CurrentCPS = 0;
884 CurrentBytes = 0;
885 TotalBytes = 0;
886 FetchedBytes = 0;
887 ElapsedTime = 0;
d568ed2d
AL
888 TotalItems = 0;
889 CurrentItems = 0;
b98f2859
AL
890}
891 /*}}}*/
a6568219 892// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
893// ---------------------------------------------------------------------
894/* This accurately computes the elapsed time and the total overall CPS. */
895void pkgAcquireStatus::Stop()
896{
897 // Compute the CPS and elapsed time
898 struct timeval NewTime;
899 gettimeofday(&NewTime,0);
900
31a0531d
AL
901 double Delta = NewTime.tv_sec - StartTime.tv_sec +
902 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 903
b98f2859 904 // Compute the CPS value
31a0531d 905 if (Delta < 0.01)
e331f6ed
AL
906 CurrentCPS = 0;
907 else
31a0531d 908 CurrentCPS = FetchedBytes/Delta;
b98f2859 909 LastBytes = CurrentBytes;
dbbc5494 910 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
911}
912 /*}}}*/
913// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
914// ---------------------------------------------------------------------
915/* This is used to get accurate final transfer rate reporting. */
73da43e9 916void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 917{
b98f2859
AL
918 FetchedBytes += Size - Resume;
919}
920 /*}}}*/