]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Changed size of offset type
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
0919e3f9 3// $Id: acquire.cc,v 1.13 1998/11/11 23:45:08 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
3b5421b4 24#include <strutl.h>
8267fe24
AL
25
26#include <sys/time.h>
0118833a
AL
27 /*}}}*/
28
29// Acquire::pkgAcquire - Constructor /*{{{*/
30// ---------------------------------------------------------------------
93bf083d 31/* We grab some runtime state from the configuration space */
8267fe24 32pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
33{
34 Queues = 0;
35 Configs = 0;
0a8a80e5
AL
36 Workers = 0;
37 ToFetch = 0;
8b89e57f 38 Running = false;
0a8a80e5
AL
39
40 string Mode = _config->Find("Acquire::Queue-Mode","host");
41 if (strcasecmp(Mode.c_str(),"host") == 0)
42 QueueMode = QueueHost;
43 if (strcasecmp(Mode.c_str(),"access") == 0)
44 QueueMode = QueueAccess;
45
46 Debug = _config->FindB("Debug::pkgAcquire",false);
0118833a
AL
47}
48 /*}}}*/
49// Acquire::~pkgAcquire - Destructor /*{{{*/
50// ---------------------------------------------------------------------
93bf083d 51/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
52pkgAcquire::~pkgAcquire()
53{
54 while (Items.size() != 0)
55 delete Items[0];
3b5421b4
AL
56
57 while (Configs != 0)
58 {
59 MethodConfig *Jnk = Configs;
60 Configs = Configs->Next;
61 delete Jnk;
62 }
0a8a80e5
AL
63
64 while (Queues != 0)
65 {
66 Queue *Jnk = Queues;
67 Queues = Queues->Next;
68 delete Jnk;
69 }
0118833a
AL
70}
71 /*}}}*/
72// Acquire::Add - Add a new item /*{{{*/
73// ---------------------------------------------------------------------
93bf083d
AL
74/* This puts an item on the acquire list. This list is mainly for tracking
75 item status */
0118833a
AL
76void pkgAcquire::Add(Item *Itm)
77{
78 Items.push_back(Itm);
79}
80 /*}}}*/
81// Acquire::Remove - Remove a item /*{{{*/
82// ---------------------------------------------------------------------
93bf083d 83/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
84void pkgAcquire::Remove(Item *Itm)
85{
86 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
87 {
88 if (*I == Itm)
89 Items.erase(I);
8267fe24 90 }
0118833a
AL
91}
92 /*}}}*/
0a8a80e5
AL
93// Acquire::Add - Add a worker /*{{{*/
94// ---------------------------------------------------------------------
93bf083d
AL
95/* A list of workers is kept so that the select loop can direct their FD
96 usage. */
0a8a80e5
AL
97void pkgAcquire::Add(Worker *Work)
98{
99 Work->NextAcquire = Workers;
100 Workers = Work;
101}
102 /*}}}*/
103// Acquire::Remove - Remove a worker /*{{{*/
104// ---------------------------------------------------------------------
93bf083d
AL
105/* A worker has died. This can not be done while the select loop is running
106 as it would require that RunFds could handling a changing list state and
107 it cant.. */
0a8a80e5
AL
108void pkgAcquire::Remove(Worker *Work)
109{
93bf083d
AL
110 if (Running == true)
111 abort();
112
0a8a80e5
AL
113 Worker **I = &Workers;
114 for (; *I != 0;)
115 {
116 if (*I == Work)
117 *I = (*I)->NextAcquire;
118 else
119 I = &(*I)->NextAcquire;
120 }
121}
122 /*}}}*/
0118833a
AL
123// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
124// ---------------------------------------------------------------------
93bf083d
AL
125/* This is the entry point for an item. An item calls this function when
126 it is construction which creates a queue (based on the current queue
127 mode) and puts the item in that queue. If the system is running then
128 the queue might be started. */
8267fe24 129void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 130{
0a8a80e5 131 // Determine which queue to put the item in
8267fe24 132 string Name = QueueName(Item.URI);
0a8a80e5
AL
133 if (Name.empty() == true)
134 return;
135
136 // Find the queue structure
137 Queue *I = Queues;
138 for (; I != 0 && I->Name != Name; I = I->Next);
139 if (I == 0)
140 {
141 I = new Queue(Name,this);
142 I->Next = Queues;
143 Queues = I;
93bf083d
AL
144
145 if (Running == true)
146 I->Startup();
0a8a80e5 147 }
bfd22fc0 148
8267fe24 149 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
150
151 // Queue it into the named queue
8267fe24 152 I->Enqueue(Item);
0a8a80e5 153 ToFetch++;
93bf083d 154
0a8a80e5
AL
155 // Some trace stuff
156 if (Debug == true)
157 {
8267fe24
AL
158 clog << "Fetching " << Item.URI << endl;
159 clog << " to " << Item.Owner->DestFile << endl;
160 clog << " Queue is: " << QueueName(Item.URI) << endl;
0a8a80e5 161 }
3b5421b4
AL
162}
163 /*}}}*/
0a8a80e5 164// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 165// ---------------------------------------------------------------------
93bf083d
AL
166/* This is called when an item is finished being fetched. It removes it
167 from all the queues */
0a8a80e5
AL
168void pkgAcquire::Dequeue(Item *Itm)
169{
170 Queue *I = Queues;
bfd22fc0 171 bool Res = false;
0a8a80e5 172 for (; I != 0; I = I->Next)
bfd22fc0 173 Res |= I->Dequeue(Itm);
93bf083d
AL
174
175 if (Debug == true)
176 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
177 if (Res == true)
178 ToFetch--;
0a8a80e5
AL
179}
180 /*}}}*/
181// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
182// ---------------------------------------------------------------------
183/* The string returned depends on the configuration settings and the
184 method parameters. Given something like http://foo.org/bar it can
185 return http://foo.org or http */
93bf083d 186string pkgAcquire::QueueName(string Uri)
3b5421b4 187{
93bf083d
AL
188 URI U(Uri);
189
190 const MethodConfig *Config = GetConfig(U.Access);
0a8a80e5
AL
191 if (Config == 0)
192 return string();
193
194 /* Single-Instance methods get exactly one queue per URI. This is
195 also used for the Access queue method */
196 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 197 return U.Access;
93bf083d
AL
198
199 return U.Access + ':' + U.Host;
0118833a
AL
200}
201 /*}}}*/
3b5421b4
AL
202// Acquire::GetConfig - Fetch the configuration information /*{{{*/
203// ---------------------------------------------------------------------
204/* This locates the configuration structure for an access method. If
205 a config structure cannot be found a Worker will be created to
206 retrieve it */
0a8a80e5 207pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
208{
209 // Search for an existing config
210 MethodConfig *Conf;
211 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
212 if (Conf->Access == Access)
213 return Conf;
214
215 // Create the new config class
216 Conf = new MethodConfig;
217 Conf->Access = Access;
218 Conf->Next = Configs;
219 Configs = Conf;
0118833a 220
3b5421b4
AL
221 // Create the worker to fetch the configuration
222 Worker Work(Conf);
223 if (Work.Start() == false)
224 return 0;
225
226 return Conf;
227}
228 /*}}}*/
0a8a80e5
AL
229// Acquire::SetFds - Deal with readable FDs /*{{{*/
230// ---------------------------------------------------------------------
231/* Collect FDs that have activity monitors into the fd sets */
232void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
233{
234 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
235 {
236 if (I->InReady == true && I->InFd >= 0)
237 {
238 if (Fd < I->InFd)
239 Fd = I->InFd;
240 FD_SET(I->InFd,RSet);
241 }
242 if (I->OutReady == true && I->OutFd >= 0)
243 {
244 if (Fd < I->OutFd)
245 Fd = I->OutFd;
246 FD_SET(I->OutFd,WSet);
247 }
248 }
249}
250 /*}}}*/
251// Acquire::RunFds - Deal with active FDs /*{{{*/
252// ---------------------------------------------------------------------
93bf083d
AL
253/* Dispatch active FDs over to the proper workers. It is very important
254 that a worker never be erased while this is running! The queue class
255 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
256void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
257{
258 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
259 {
260 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
261 I->InFdReady();
262 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
263 I->OutFdReady();
264 }
265}
266 /*}}}*/
267// Acquire::Run - Run the fetch sequence /*{{{*/
268// ---------------------------------------------------------------------
269/* This runs the queues. It manages a select loop for all of the
270 Worker tasks. The workers interact with the queues and items to
271 manage the actual fetch. */
272bool pkgAcquire::Run()
273{
8b89e57f
AL
274 Running = true;
275
0a8a80e5
AL
276 for (Queue *I = Queues; I != 0; I = I->Next)
277 I->Startup();
278
b98f2859
AL
279 if (Log != 0)
280 Log->Start();
281
0a8a80e5 282 // Run till all things have been acquired
8267fe24
AL
283 struct timeval tv;
284 tv.tv_sec = 0;
285 tv.tv_usec = 500000;
0a8a80e5
AL
286 while (ToFetch > 0)
287 {
288 fd_set RFds;
289 fd_set WFds;
290 int Highest = 0;
291 FD_ZERO(&RFds);
292 FD_ZERO(&WFds);
293 SetFds(Highest,&RFds,&WFds);
294
8267fe24
AL
295 int Res = select(Highest+1,&RFds,&WFds,0,&tv);
296 if (Res < 0)
8b89e57f 297 {
8267fe24
AL
298 _error->Errno("select","Select has failed");
299 break;
8b89e57f 300 }
93bf083d 301
0a8a80e5 302 RunFds(&RFds,&WFds);
93bf083d
AL
303 if (_error->PendingError() == true)
304 break;
8267fe24
AL
305
306 // Timeout, notify the log class
307 if (Res == 0 || (Log != 0 && Log->Update == true))
308 {
309 tv.tv_usec = 500000;
310 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
311 I->Pulse();
312 if (Log != 0)
313 Log->Pulse(this);
314 }
0a8a80e5 315 }
be4401bf 316
b98f2859
AL
317 if (Log != 0)
318 Log->Stop();
319
be4401bf
AL
320 // Shut down the acquire bits
321 Running = false;
0a8a80e5
AL
322 for (Queue *I = Queues; I != 0; I = I->Next)
323 I->Shutdown();
324
0919e3f9 325 return !_error->PendingError();
93bf083d
AL
326}
327 /*}}}*/
be4401bf 328// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
329// ---------------------------------------------------------------------
330/* This routine bumps idle queues in hopes that they will be able to fetch
331 the dequeued item */
332void pkgAcquire::Bump()
333{
be4401bf
AL
334 for (Queue *I = Queues; I != 0; I = I->Next)
335 I->Bump();
0a8a80e5
AL
336}
337 /*}}}*/
8267fe24
AL
338// Acquire::WorkerStep - Step to the next worker /*{{{*/
339// ---------------------------------------------------------------------
340/* Not inlined to advoid including acquire-worker.h */
341pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
342{
343 return I->NextAcquire;
344};
345 /*}}}*/
3b5421b4
AL
346
347// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
348// ---------------------------------------------------------------------
349/* */
350pkgAcquire::MethodConfig::MethodConfig()
351{
352 SingleInstance = false;
353 PreScan = false;
0a8a80e5
AL
354 Pipeline = false;
355 SendConfig = false;
356 Next = 0;
357}
358 /*}}}*/
359
360// Queue::Queue - Constructor /*{{{*/
361// ---------------------------------------------------------------------
362/* */
363pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
364 Owner(Owner)
365{
366 Items = 0;
367 Next = 0;
368 Workers = 0;
369}
370 /*}}}*/
371// Queue::~Queue - Destructor /*{{{*/
372// ---------------------------------------------------------------------
373/* */
374pkgAcquire::Queue::~Queue()
375{
376 Shutdown();
377
378 while (Items != 0)
379 {
380 QItem *Jnk = Items;
381 Items = Items->Next;
382 delete Jnk;
383 }
384}
385 /*}}}*/
386// Queue::Enqueue - Queue an item to the queue /*{{{*/
387// ---------------------------------------------------------------------
388/* */
8267fe24 389void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5
AL
390{
391 // Create a new item
8267fe24 392 QItem *I = new QItem;
0a8a80e5
AL
393 I->Next = Items;
394 Items = I;
8267fe24 395 *I = Item;
0a8a80e5 396
8267fe24 397 Item.Owner->QueueCounter++;
93bf083d
AL
398 if (Items->Next == 0)
399 Cycle();
0a8a80e5
AL
400}
401 /*}}}*/
c88edf1d 402// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 403// ---------------------------------------------------------------------
bfd22fc0
AL
404/* We return true if we hit something*/
405bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 406{
bfd22fc0
AL
407 bool Res = false;
408
0a8a80e5
AL
409 QItem **I = &Items;
410 for (; *I != 0;)
411 {
412 if ((*I)->Owner == Owner)
413 {
414 QItem *Jnk= *I;
415 *I = (*I)->Next;
416 Owner->QueueCounter--;
417 delete Jnk;
bfd22fc0 418 Res = true;
0a8a80e5
AL
419 }
420 else
421 I = &(*I)->Next;
422 }
bfd22fc0
AL
423
424 return Res;
0a8a80e5
AL
425}
426 /*}}}*/
427// Queue::Startup - Start the worker processes /*{{{*/
428// ---------------------------------------------------------------------
429/* */
430bool pkgAcquire::Queue::Startup()
431{
432 Shutdown();
433
93bf083d
AL
434 URI U(Name);
435 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
0a8a80e5
AL
436 if (Cnf == 0)
437 return false;
438
8267fe24 439 Workers = new Worker(this,Cnf,Owner->Log);
0a8a80e5
AL
440 Owner->Add(Workers);
441 if (Workers->Start() == false)
442 return false;
0a8a80e5 443
93bf083d 444 return Cycle();
0a8a80e5
AL
445}
446 /*}}}*/
447// Queue::Shutdown - Shutdown the worker processes /*{{{*/
448// ---------------------------------------------------------------------
449/* */
450bool pkgAcquire::Queue::Shutdown()
451{
452 // Delete all of the workers
453 while (Workers != 0)
454 {
455 pkgAcquire::Worker *Jnk = Workers;
456 Workers = Workers->NextQueue;
457 Owner->Remove(Jnk);
458 delete Jnk;
459 }
460
461 return true;
3b5421b4
AL
462}
463 /*}}}*/
c88edf1d
AL
464// Queue::Finditem - Find a URI in the item list /*{{{*/
465// ---------------------------------------------------------------------
466/* */
467pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
468{
469 for (QItem *I = Items; I != 0; I = I->Next)
470 if (I->URI == URI && I->Worker == Owner)
471 return I;
472 return 0;
473}
474 /*}}}*/
475// Queue::ItemDone - Item has been completed /*{{{*/
476// ---------------------------------------------------------------------
477/* The worker signals this which causes the item to be removed from the
93bf083d
AL
478 queue. If this is the last queue instance then it is removed from the
479 main queue too.*/
c88edf1d
AL
480bool pkgAcquire::Queue::ItemDone(QItem *Itm)
481{
93bf083d
AL
482 if (Itm->Owner->QueueCounter <= 1)
483 Owner->Dequeue(Itm->Owner);
484 else
485 {
486 Dequeue(Itm->Owner);
487 Owner->Bump();
488 }
c88edf1d 489
93bf083d
AL
490 return Cycle();
491}
492 /*}}}*/
493// Queue::Cycle - Queue new items into the method /*{{{*/
494// ---------------------------------------------------------------------
495/* This locates a new idle item and sends it to the worker */
496bool pkgAcquire::Queue::Cycle()
497{
498 if (Items == 0 || Workers == 0)
c88edf1d
AL
499 return true;
500
93bf083d
AL
501 // Look for a queable item
502 QItem *I = Items;
503 for (; I != 0; I = I->Next)
504 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
505 break;
506
507 // Nothing to do, queue is idle.
508 if (I == 0)
509 return true;
510
511 I->Worker = Workers;
512 I->Owner->Status = pkgAcquire::Item::StatFetching;
513 return Workers->QueueItem(I);
c88edf1d
AL
514}
515 /*}}}*/
be4401bf
AL
516// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
517// ---------------------------------------------------------------------
518/* */
519void pkgAcquire::Queue::Bump()
520{
521}
522 /*}}}*/
b98f2859
AL
523
524// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
525// ---------------------------------------------------------------------
526/* */
527pkgAcquireStatus::pkgAcquireStatus()
528{
529 Start();
530}
531 /*}}}*/
532// AcquireStatus::Pulse - Called periodically /*{{{*/
533// ---------------------------------------------------------------------
534/* This computes some internal state variables for the derived classes to
535 use. It generates the current downloaded bytes and total bytes to download
536 as well as the current CPS estimate. */
537void pkgAcquireStatus::Pulse(pkgAcquire *Owner)
538{
539 TotalBytes = 0;
540 CurrentBytes = 0;
541
542 // Compute the total number of bytes to fetch
543 unsigned int Unknown = 0;
544 unsigned int Count = 0;
545 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
546 I++, Count++)
547 {
548 TotalBytes += (*I)->FileSize;
549 if ((*I)->Complete == true)
550 CurrentBytes += (*I)->FileSize;
551 if ((*I)->FileSize == 0 && (*I)->Complete == false)
552 Unknown++;
553 }
554
555 // Compute the current completion
556 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
557 I = Owner->WorkerStep(I))
558 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
559 CurrentBytes += I->CurrentSize;
560
561 // Normalize the figures and account for unknown size downloads
562 if (TotalBytes <= 0)
563 TotalBytes = 1;
564 if (Unknown == Count)
565 TotalBytes = Unknown;
566 else
567 TotalBytes += TotalBytes/(Count - Unknown)*Unknown;
568
569 // Compute the CPS
570 struct timeval NewTime;
571 gettimeofday(&NewTime,0);
572 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
573 NewTime.tv_sec - Time.tv_sec > 6)
574 {
575 // Compute the delta time with full accuracy
576 long usdiff = NewTime.tv_usec - Time.tv_usec;
577 long sdiff = NewTime.tv_sec - Time.tv_sec;
578
579 // Borrow
580 if (usdiff < 0)
581 {
582 usdiff += 1000000;
583 sdiff--;
584 }
585
586 // Compute the CPS value
587 CurrentCPS = (CurrentBytes - LastBytes)/(sdiff + usdiff/1000000.0);
588 LastBytes = CurrentBytes;
589 ElapsedTime = NewTime.tv_sec - StartTime.tv_sec;
590 Time = NewTime;
591 }
592}
593 /*}}}*/
594// AcquireStatus::Start - Called when the download is started /*{{{*/
595// ---------------------------------------------------------------------
596/* We just reset the counters */
597void pkgAcquireStatus::Start()
598{
599 gettimeofday(&Time,0);
600 gettimeofday(&StartTime,0);
601 LastBytes = 0;
602 CurrentCPS = 0;
603 CurrentBytes = 0;
604 TotalBytes = 0;
605 FetchedBytes = 0;
606 ElapsedTime = 0;
607}
608 /*}}}*/
609// pkgAcquireStatus::Stop - Finished downloading /*{{{*/
610// ---------------------------------------------------------------------
611/* This accurately computes the elapsed time and the total overall CPS. */
612void pkgAcquireStatus::Stop()
613{
614 // Compute the CPS and elapsed time
615 struct timeval NewTime;
616 gettimeofday(&NewTime,0);
617
618 // Compute the delta time with full accuracy
619 long usdiff = NewTime.tv_usec - StartTime.tv_usec;
620 long sdiff = NewTime.tv_sec - StartTime.tv_sec;
621
622 // Borrow
623 if (usdiff < 0)
624 {
625 usdiff += 1000000;
626 sdiff--;
627 }
628
629 // Compute the CPS value
630 CurrentCPS = FetchedBytes/(sdiff + usdiff/1000000.0);
631 LastBytes = CurrentBytes;
632 ElapsedTime = sdiff;
633}
634 /*}}}*/
635// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
636// ---------------------------------------------------------------------
637/* This is used to get accurate final transfer rate reporting. */
638void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 639{
b98f2859
AL
640 FetchedBytes += Size - Resume;
641}
642 /*}}}*/