source: CIVL/examples/omp/nas-dc/DC/extbuild.c@ 1aaefd4

main test-branch
Last change on this file since 1aaefd4 was ea777aa, checked in by Alex Wilton <awilton@…>, 3 years ago

Moved examples, include, build_default.properties, common.xml, and README out from dev.civl.com into the root of the repo.

git-svn-id: svn://vsl.cis.udel.edu/civl/trunk@5704 fb995dde-84ed-4084-dfe6-e5aef3e2452c

  • Property mode set to 100644
File size: 32.9 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4
5#include "adc.h"
6#include "macrodef.h"
7#include "protots.h"
8
9#ifdef UNIX
10#include <errno.h>
11#endif
12
13extern int32 computeChecksum(ADC_VIEW_CNTL *avp,treeNode *t,uint64 *ordern);
14extern int32 WriteViewToDiskCS(ADC_VIEW_CNTL *avp,treeNode *t,uint64 *ordern);
15
16int32 ReadWholeInputData(ADC_VIEW_CNTL *avp, FILE *inpf){
17 uint32 iRec = 0;
18 uint32 inpBufferLineSize, inpBufferPace, inpRecSize, ib = 0;
19
20 FSEEK(inpf, 0L, SEEK_SET);
21 inpRecSize = 8*avp->nm+4*avp->nTopDims;
22 inpBufferLineSize = inpRecSize;
23 if (inpBufferLineSize%8) inpBufferLineSize += 4;
24 inpBufferPace = inpBufferLineSize/4;
25
26 while(fread(&avp->inpDataBuffer[ib], inpRecSize, 1, inpf)){
27 iRec++;
28 ib += inpBufferPace;
29 }
30 avp->nRowsToRead = iRec;
31 FSEEK(inpf, 0L, SEEK_SET);
32
33 if(avp->nInputRecs != iRec){
34 fprintf(stderr, " ReadWholeInputData(): wrong input data reading.\n");
35 return ADC_INTERNAL_ERROR;
36 }
37 return ADC_OK;
38}
39int32 ComputeMemoryFittedView (ADC_VIEW_CNTL *avp){
40 uint32 iRec = 0;
41 uint32 viewBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
42 uint32 inpBufferLineSize, inpBufferPace, inpRecSize,ib;
43 uint64 ordern=0;
44#ifdef VIEW_FILE_OUTPUT
45 uint32 retCode;
46#endif
47
48 FSEEK(avp->viewFile, 0L, SEEK_END);
49 inpRecSize = 8*avp->nm+4*avp->nTopDims;
50 inpBufferLineSize = inpRecSize;
51 if (inpBufferLineSize%8) inpBufferLineSize += 4;
52 inpBufferPace = inpBufferLineSize/4;
53
54 InitializeTree(avp->tree, avp->nv, avp->nm);
55
56 ib=0;
57 for ( iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
58 SelectToView( &avp->inpDataBuffer[ib], avp->selection, viewBuf,
59 avp->nd, avp->nm, avp->nv );
60 ib += inpBufferPace;
61 TreeInsert(avp->tree, viewBuf);
62 if(avp->tree->memoryIsFull){
63 fprintf(stderr, "ComputeMemoryFittedView(): Not enough memory.\n");
64 return 1;
65 }
66 }
67
68#ifdef VIEW_FILE_OUTPUT
69 if( retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern) ){
70 fprintf(stderr, "ComputeMemoryFittedView() Write error is occured.\n");
71 return retCode;
72 }
73#else
74 computeChecksum(avp,avp->tree->root.left,&ordern);
75#endif
76
77 avp->nViewRows = avp->tree->count;
78 avp->totalOfViewRows += avp->nViewRows;
79 InitializeTree(avp->tree, avp->nv, avp->nm);
80 return ADC_OK;
81}
82
83int32 SharedSortAggregate(ADC_VIEW_CNTL *avp){
84 int32 retCode;
85 uint32 iRec = 0;
86 uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
87 uint32 currBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
88 int64 chunkOffset = 0;
89 int64 inpfOffset;
90 uint32 nPart = 0;
91 uint32 prevV;
92 uint32 currV;
93 uint32 total = 0;
94 unsigned char *ib;
95 uint32 ibsize = SSA_BUFFER_SIZE;
96 uint32 nib;
97 uint32 iib;
98 uint32 nreg;
99 uint32 nlst;
100 uint32 nsgs;
101 uint32 ncur;
102 uint32 ibOffset = 0;
103 uint64 ordern=0;
104
105 ib = (unsigned char*) malloc(ibsize);
106 if (!ib){
107 fprintf(stderr,"SharedSortAggregate: memory allocation failed\n");
108 return ADC_MEMORY_ALLOCATION_FAILURE;
109 }
110
111 nib = ibsize/avp->inpRecSize;
112 nsgs = avp->nRowsToRead/nib;
113
114 if (nsgs == 0){
115 nreg = avp->nRowsToRead;
116 nlst = nreg;
117 nsgs = 1;
118 }else{
119 nreg = nib;
120 if (avp->nRowsToRead%nib) {
121 nsgs++;
122 nlst = avp->nRowsToRead%nib;
123 }else{
124 nlst = nreg;
125 }
126 }
127
128 avp->nViewRows = 0;
129 for( iib = 1; iib <= nsgs; iib++ ){
130 if(iib > 1) FSEEK(avp->viewFile, inpfOffset, SEEK_SET);
131 if( iib == nsgs ) ncur = nlst; else ncur = nreg;
132
133 fread(ib, ncur*avp->inpRecSize, 1, avp->viewFile);
134 inpfOffset = ftell(avp->viewFile);
135
136 for( ibOffset = 0, iRec = 1; iRec <= ncur; iRec++ ){
137 memcpy(attrs, &ib[ibOffset], avp->inpRecSize);
138 ibOffset += avp->inpRecSize;
139 SelectToView(attrs, avp->selection, currBuf, avp->nd, avp->nm, avp->nv);
140 currV = currBuf[2*avp->nm];
141
142 if(iib == 1 && iRec == 1){
143 prevV = currV;
144 nPart = 1;
145 InitializeTree(avp->tree, avp->nv, avp->nm);
146 TreeInsert(avp->tree, currBuf);
147 }else{
148 if (currV == prevV){
149 nPart++;
150 TreeInsert (avp->tree, currBuf);
151 if (avp->tree->memoryIsFull){
152 avp->chunksParams[avp->numberOfChunks].curChunkNum =
153 avp->tree->count;
154 avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
155 (avp->numberOfChunks)++;
156 if(avp->numberOfChunks >= MAX_NUM_OF_CHUNKS){
157 fprintf(stderr,"Too many chunks were created.\n");
158 exit(1);
159 }
160 chunkOffset += (uint64)(avp->tree->count*avp->outRecSize);
161 retCode=WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
162 avp->tree->root.left, avp->logf);
163 if(retCode!=ADC_OK){
164 fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
165 return retCode;
166 }
167 InitializeTree(avp->tree, avp->nv, avp->nm);
168 } /* memoryIsFull */
169 }else{
170 if(avp->numberOfChunks && avp->tree->count!=0){
171 avp->chunksParams[avp->numberOfChunks].curChunkNum =
172 avp->tree->count;
173 avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
174 (avp->numberOfChunks)++;
175 chunkOffset +=
176 (uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
177 retCode=WriteChunkToDisk( avp->outRecSize, avp->fileOfChunks,
178 avp->tree->root.left, avp->logf);
179 if(retCode!=ADC_OK){
180 fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
181 return retCode;
182 }
183 }
184 FSEEK(avp->viewFile, 0L, SEEK_END);
185 if(!avp->numberOfChunks){
186 avp->nViewRows += avp->tree->count;
187 retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern);
188 if(retCode!=ADC_OK){
189 fprintf(stderr,
190 "SharedSortAggregate: Write error occured.\n");
191 return retCode;
192 }
193 }else{
194 retCode=MultiWayMerge(avp);
195 if(retCode!=ADC_OK) {
196 fprintf(stderr,"SharedSortAggregate.MultiWayMerge: failed.\n");
197 return retCode;
198 }
199 }
200 InitializeTree(avp->tree, avp->nv, avp->nm);
201 TreeInsert(avp->tree, currBuf);
202 total += nPart;
203 nPart = 1;
204 }
205 }
206 prevV = currV;
207 } /* iRec */
208 } /* iib */
209
210 if(avp->numberOfChunks && avp->tree->count!=0) {
211 avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
212 avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
213 (avp->numberOfChunks)++;
214 chunkOffset += (uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
215 retCode=WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
216 avp->tree->root.left, avp->logf);
217 if(retCode!=ADC_OK){
218 fprintf(stderr,"SharedSortAggregate: Write error occured.\n");
219 return retCode;
220 }
221 }
222 FSEEK(avp->viewFile, 0L, SEEK_END);
223 if(!avp->numberOfChunks){
224 avp->nViewRows += avp->tree->count;
225 if( retCode = WriteViewToDiskCS(avp, avp->tree->root.left,&ordern)){
226 fprintf(stderr, "SharedSortAggregate: Write error occured.\n");
227 return retCode;
228 }
229 }else{
230 retCode=MultiWayMerge(avp);
231 if(retCode!=ADC_OK) {
232 fprintf(stderr,"SharedSortAggregate.MultiWayMerge failed.\n");
233 return retCode;
234 }
235 }
236 FSEEK(avp->fileOfChunks, 0L, SEEK_SET);
237
238 total += nPart;
239 avp->totalOfViewRows += avp->nViewRows;
240 if(ib) free(ib);
241 return ADC_OK;
242}
243int32 PrefixedAggregate(ADC_VIEW_CNTL *avp, FILE *iof){
244 uint32 i;
245 uint32 iRec = 0;
246 uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
247 uint32 aggrBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
248 uint32 currBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
249 uint32 prevBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
250 int64 *aggrmp;
251 int64 *currmp;
252 int32 compRes;
253 uint32 nOut = 0;
254 uint32 mpOffset = 0;
255 uint32 nOutBufRecs;
256 uint32 nViewRows = 0;
257 int64 inpfOffset;
258
259 aggrmp = (int64*) &aggrBuf[0];
260 currmp = (int64*) &currBuf[0];
261
262 for(i = 0; i < 2*avp->nm+avp->nv; i++){prevBuf[i] = 0; aggrBuf[i] = 0;}
263 nOutBufRecs = avp->memoryLimit/avp->outRecSize;
264
265 for(iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
266 fread(attrs, avp->inpRecSize, 1, iof);
267 SelectToView(attrs, avp->selection, currBuf, avp->nd, avp->nm, avp->nv);
268 if (iRec == 1) memcpy(aggrBuf, currBuf, avp->outRecSize);
269 else{
270 compRes = KeyComp( &currBuf[2*avp->nm], &prevBuf[2*avp->nm], avp->nv);
271
272 switch(compRes){
273 case 1:
274 memcpy(&avp->memPool[mpOffset], aggrBuf, avp->outRecSize);
275 mpOffset += avp->outRecSize;
276 nOut++;
277 for ( i = 0; i < avp->nm; i++ ){
278 avp->mSums[i] += aggrmp[i];
279 avp->checksums[i] += nOut*aggrmp[i]%measbound;
280 }
281 memcpy(aggrBuf, currBuf, avp->outRecSize);
282 break;
283 case 0:
284 for ( i = 0; i < avp->nm; i++ ) aggrmp[i] += currmp[i];
285 break;
286 case -1:
287 fprintf(stderr,"PrefixedAggregate: wrong parent view order.\n");
288 exit(1);
289 break;
290 default:
291 fprintf(stderr,"PrefixedAggregate: wrong KeyComp() result.\n");
292 exit(1);
293 break;
294 }
295
296 if (nOut == nOutBufRecs){
297 inpfOffset = ftell(iof);
298 FSEEK(iof, 0L, SEEK_END);
299 WriteToFile(avp->memPool, nOut*avp->outRecSize, 1, iof, stderr);
300 FSEEK(iof, inpfOffset, SEEK_SET);
301 mpOffset = 0;
302 nViewRows += nOut;
303 nOut = 0;
304 }
305 }
306 memcpy(prevBuf, currBuf, avp->outRecSize);
307 }
308 memcpy(&avp->memPool[mpOffset], aggrBuf, avp->outRecSize);
309 nOut++;
310 for ( i = 0; i < avp->nm; i++ ){
311 avp->mSums[i] += aggrmp[i];
312 avp->checksums[i] += nOut*aggrmp[i]%measbound;
313 }
314 FSEEK(iof, 0L, SEEK_END);
315 WriteToFile(avp->memPool, nOut*avp->outRecSize, 1, iof, stderr);
316 avp->nViewRows = nViewRows+nOut;
317 avp->totalOfViewRows += avp->nViewRows;
318 return ADC_OK;
319}
320int32 RunFormation (ADC_VIEW_CNTL *avp, FILE *inpf){
321 uint32 iRec = 0;
322 uint32 viewBuf[MAX_VIEW_ROW_SIZE_IN_INTS];
323 uint32 attrs[MAX_VIEW_ROW_SIZE_IN_INTS];
324 int64 chunkOffset = 0;
325
326 InitializeTree(avp->tree, avp->nv, avp->nm);
327
328 for(iRec = 1; iRec <= avp->nRowsToRead; iRec++ ){
329 fread(attrs, avp->inpRecSize, 1, inpf);
330 SelectToView(attrs, avp->selection, viewBuf, avp->nd, avp->nm, avp->nv);
331 TreeInsert(avp->tree, viewBuf);
332
333 if(avp->tree->memoryIsFull) {
334 avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
335 avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
336 (avp->numberOfChunks)++;
337 if (avp->numberOfChunks >= MAX_NUM_OF_CHUNKS) {
338 fprintf(stderr, "RunFormation: Too many chunks were created.\n");
339 return ADC_INTERNAL_ERROR;
340 }
341 chunkOffset += (uint64)(avp->tree->count*avp->outRecSize);
342 if(WriteChunkToDisk( avp->outRecSize, avp->fileOfChunks,
343 avp->tree->root.left, avp->logf )){
344 fprintf(stderr,
345 "RunFormation.WriteChunkToDisk: Write error is occured.\n");
346 return ADC_WRITE_FAILED;
347 }
348 InitializeTree(avp->tree, avp->nv, avp->nm);
349 }
350 } /* Insertion ... */
351 if(avp->numberOfChunks && avp->tree->count!=0) {
352 avp->chunksParams[avp->numberOfChunks].curChunkNum = avp->tree->count;
353 avp->chunksParams[avp->numberOfChunks].chunkOffset = chunkOffset;
354 (avp->numberOfChunks)++;
355 chunkOffset += (uint64)(avp->tree->count*(4*avp->nv + 8*avp->nm));
356 if(WriteChunkToDisk(avp->outRecSize, avp->fileOfChunks,
357 avp->tree->root.left, avp->logf)){
358 fprintf(stderr,
359 "RunFormation(.WriteChunkToDisk: Write error is occured.\n");
360 return ADC_WRITE_FAILED;
361 }
362 }
363 FSEEK(avp->viewFile, 0L, SEEK_END);
364 return ADC_OK;
365}
366void SeekAndReadNextSubChunk( uint32 multiChunkBuffer[],
367 uint32 k,
368 FILE *inFile,
369 uint32 chunkRecSize,
370 uint64 inFileOffs,
371 uint32 subChunkNum){
372 int64 ret;
373
374 ret = FSEEK(inFile, inFileOffs, SEEK_SET);
375 if (ret < 0){
376 fprintf(stderr,"SeekAndReadNextSubChunk.fseek() < 0 ");
377 exit(1);
378 }
379 fread(&multiChunkBuffer[k], chunkRecSize*subChunkNum, 1, inFile);
380}
381void ReadSubChunk(
382 uint32 chunkRecSize,
383 uint32 *multiChunkBuffer,
384 uint32 mwBufRecSizeInInt,
385 uint32 iChunk,
386 uint32 regSubChunkSize,
387 CHUNKS *chunks,
388 FILE *fileOfChunks
389 ){
390 if (chunks[iChunk].curChunkNum > 0){
391 if(chunks[iChunk].curChunkNum < regSubChunkSize){
392 SeekAndReadNextSubChunk(multiChunkBuffer,
393 (iChunk*regSubChunkSize +
394 (regSubChunkSize-chunks[iChunk].curChunkNum))*
395 mwBufRecSizeInInt,
396 fileOfChunks,
397 chunkRecSize,
398 chunks[iChunk].chunkOffset,
399 chunks[iChunk].curChunkNum);
400 chunks[iChunk].posSubChunk=regSubChunkSize-chunks[iChunk].curChunkNum;
401 chunks[iChunk].curSubChunk=chunks[iChunk].curChunkNum;
402 chunks[iChunk].curChunkNum=0;
403 chunks[iChunk].chunkOffset=-1;
404 }else{
405 SeekAndReadNextSubChunk(multiChunkBuffer,
406 iChunk*regSubChunkSize*mwBufRecSizeInInt,
407 fileOfChunks,
408 chunkRecSize,
409 chunks[iChunk].chunkOffset,
410 regSubChunkSize);
411 chunks[iChunk].posSubChunk = 0;
412 chunks[iChunk].curSubChunk = regSubChunkSize;
413 chunks[iChunk].curChunkNum -= regSubChunkSize;
414 chunks[iChunk].chunkOffset += regSubChunkSize * chunkRecSize;
415 }
416 }
417}
418int32 MultiWayMerge(ADC_VIEW_CNTL *avp){
419 uint32 outputBuffer[OUTPUT_BUFFER_SIZE];
420 uint32 r_buf [OUTPUT_BUFFER_SIZE];
421 uint32 min_r_buf [OUTPUT_BUFFER_SIZE];
422 uint32 first_one;
423 uint32 i;
424 uint32 iChunk;
425 uint32 min_r_chunk;
426 uint32 sPos;
427 uint32 iPos;
428 uint32 numEmptyBufs;
429 uint32 numEmptyRuns;
430 uint32 mwBufRecSizeInInt;
431 uint32 chunkRecSize;
432 uint32 *multiChunkBuffer;
433 uint32 regSubChunkSize;
434 int32 compRes;
435 int64 *m_min_r_buf;
436 int64 *m_outputBuffer;
437
438 FSEEK(avp->fileOfChunks, 0L, SEEK_SET);
439
440 multiChunkBuffer = (uint32*) &avp->memPool[0];
441 first_one = 1;
442 avp->nViewRows = 0;
443
444 chunkRecSize = avp->outRecSize;
445 mwBufRecSizeInInt = chunkRecSize/4;
446 m_min_r_buf = (int64*)&min_r_buf[0];
447 m_outputBuffer = (int64*)&outputBuffer[0];
448
449 mwBufRecSizeInInt = chunkRecSize/4;
450 regSubChunkSize = (avp->memoryLimit/avp->numberOfChunks)/chunkRecSize;
451
452 if (regSubChunkSize==0) {
453 fprintf(stderr,
454 "MultiWayMerge: Not enough memory to run the external sort\n");
455 return ADC_INTERNAL_ERROR;
456 }
457 multiChunkBuffer = (uint32*) &avp->memPool[0];
458
459 for(i = 0; i < avp->numberOfChunks; i++ ){
460 ReadSubChunk(
461 chunkRecSize,
462 multiChunkBuffer,
463 mwBufRecSizeInInt,
464 i,
465 regSubChunkSize,
466 avp->chunksParams,
467 avp->fileOfChunks
468 );
469 }
470 while(1){
471 for(iChunk = 0;iChunk<avp->numberOfChunks;iChunk++){
472 if (avp->chunksParams[iChunk].curSubChunk > 0){
473 sPos = iChunk*regSubChunkSize*mwBufRecSizeInInt;
474 iPos = sPos+mwBufRecSizeInInt*avp->chunksParams[iChunk].posSubChunk;
475 memcpy(&min_r_buf[0], &multiChunkBuffer[iPos], avp->outRecSize);
476 min_r_chunk = iChunk;
477 break;
478 }
479 }
480 for ( iChunk = min_r_chunk; iChunk < avp->numberOfChunks; iChunk++ ){
481 uint32 iPos;
482
483 if (avp->chunksParams[iChunk].curSubChunk > 0){
484 iPos = mwBufRecSizeInInt*(iChunk*regSubChunkSize+
485 avp->chunksParams[iChunk].posSubChunk);
486 memcpy(&r_buf[0],&multiChunkBuffer[iPos],avp->outRecSize);
487
488 compRes=KeyComp(&r_buf[2*avp->nm],&min_r_buf[2*avp->nm],avp->nv);
489 if(compRes < 0) {
490 memcpy(&min_r_buf[0], &r_buf[0], avp->outRecSize);
491 min_r_chunk = iChunk;
492 }
493 }
494 }
495 /* Step forward */
496 if(avp->chunksParams[min_r_chunk].curSubChunk != 0){
497 avp->chunksParams[min_r_chunk].curSubChunk--;
498 avp->chunksParams[min_r_chunk].posSubChunk++;
499 }
500
501 /* Aggreagation if a duplicate is encountered */
502 if(first_one){
503 memcpy( &outputBuffer[0], &min_r_buf[0], avp->outRecSize);
504 first_one = 0;
505 }else{
506 compRes = KeyComp( &outputBuffer[2*avp->nm],
507 &min_r_buf[2*avp->nm], avp->nv );
508 if(!compRes){
509 for(i = 0; i < avp->nm; i++ ){
510 m_outputBuffer[i] += m_min_r_buf[i];
511 }
512 }else{
513 WriteToFile(outputBuffer,avp->outRecSize,1,avp->viewFile,stderr);
514 avp->nViewRows++;
515 for(i=0;i<avp->nm;i++){
516 avp->mSums[i]+=m_outputBuffer[i];
517 avp->checksums[i] += avp->nViewRows*m_outputBuffer[i]%measbound;
518 }
519 memcpy( &outputBuffer[0], &min_r_buf[0], avp->outRecSize );
520 }
521 }
522
523 for(numEmptyBufs = 0,
524 numEmptyRuns = 0, i = 0; i < avp->numberOfChunks; i++ ){
525 if (avp->chunksParams[i].curSubChunk == 0) numEmptyBufs++;
526 if (avp->chunksParams[i].curChunkNum == 0) numEmptyRuns++;
527 }
528 if( numEmptyBufs == avp->numberOfChunks
529 &&numEmptyRuns == avp->numberOfChunks) break;
530
531 if(avp->chunksParams[min_r_chunk].curSubChunk == 0) {
532 ReadSubChunk(
533 chunkRecSize,
534 multiChunkBuffer,
535 mwBufRecSizeInInt,
536 min_r_chunk,
537 regSubChunkSize,
538 avp->chunksParams,
539 avp->fileOfChunks);
540 }
541 } /* while(1) */
542
543 WriteToFile( outputBuffer, avp->outRecSize, 1, avp->viewFile, stderr);
544 avp->nViewRows++;
545 for(i = 0; i < avp->nm; i++ ){
546 avp->mSums[i] += m_outputBuffer[i];
547 avp->checksums[i] += avp->nViewRows*m_outputBuffer[i]%measbound;
548 }
549
550 avp->totalOfViewRows += avp->nViewRows;
551 return ADC_OK;
552}
553void SelectToView( uint32 * ib, uint32 *ix, uint32 *viewBuf,
554 uint32 nd, uint32 nm, uint32 nv ){
555 uint32 i, j;
556 for ( j = 0, i = 0; i < nv; i++ ) viewBuf[2*nm+j++] = ib[2*nm+ix[i]-1];
557 memcpy(&viewBuf[0], &ib[0], MSR_FSZ*nm);
558}
559FILE * AdcFileOpen(const char *fileName, const char *mode){
560 FILE *fr;
561 if ((fr = (FILE*) fopen(fileName, mode))==NULL)
562 fprintf(stderr, "AdcFileOpen: Cannot open the file %s errno = %d\n",
563 fileName, errno);
564 return fr;
565}
566void AdcFileName(char *adcFileName, const char *adcName,
567 const char *fileName, uint32 taskNumber){
568 sprintf(adcFileName, "%s.%s.%d",adcName,fileName,taskNumber);
569}
570ADC_VIEW_CNTL * NewAdcViewCntl(ADC_VIEW_PARS *adcpp, uint32 pnum){
571 ADC_VIEW_CNTL *adccntl;
572 uint32 i, j, k;
573#ifdef IN_CORE
574 uint32 ux;
575#endif
576 char id[8+1];
577
578 adccntl = (ADC_VIEW_CNTL *) malloc(sizeof(ADC_VIEW_CNTL));
579 if (adccntl==NULL) return NULL;
580
581 adccntl->ndid = adcpp->ndid;
582 adccntl->taskNumber = pnum;
583 adccntl->retCode = 0;
584 adccntl->swapIt = 0;
585 strcpy(adccntl->adcName, adcpp->adcName);
586 adccntl->nTopDims = adcpp->nd;
587 adccntl->nd = adcpp->nd;
588 adccntl->nm = adcpp->nm;
589 adccntl->nInputRecs = adcpp->nInputRecs;
590 adccntl->inpRecSize = GetRecSize(adccntl->nd,adccntl->nm);
591 adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
592 adccntl->accViewFileOffset = 0;
593 adccntl->totalViewFileSize = 0;
594 adccntl->numberOfMadeViews = 0;
595 adccntl->numberOfViewsMadeFromInput = 0;
596 adccntl->numberOfPrefixedGroupbys = 0;
597 adccntl->numberOfSharedSortGroupbys = 0;
598 adccntl->totalOfViewRows = 0;
599 adccntl->memoryLimit = adcpp->memoryLimit;
600 adccntl->nTasks = adcpp->nTasks;
601 strcpy(adccntl->inpFileName, adcpp->adcInpFileName);
602 sprintf(id, ".%d", adcpp->ndid);
603
604 AdcFileName(adccntl->adcLogFileName,
605 adccntl->adcName, "logf", adccntl->taskNumber);
606 strcat(adccntl->adcLogFileName, id);
607 adccntl->logf = AdcFileOpen(adccntl->adcLogFileName, "w");
608
609 AdcFileName(adccntl->inpFileName, adccntl->adcName, "dat", adcpp->ndid);
610 adccntl->inpf = AdcFileOpen(adccntl->inpFileName, "rb");
611 if(!adccntl->inpf){
612 adccntl->retCode = ADC_FILE_OPEN_FAILURE;
613 return(adccntl);
614 }
615
616 AdcFileName(adccntl->viewFileName, adccntl->adcName,
617 "view.dat", adccntl->taskNumber);
618 strcat(adccntl->viewFileName, id);
619 adccntl->viewFile = AdcFileOpen(adccntl->viewFileName, "wb+");
620
621 AdcFileName(adccntl->chunksFileName, adccntl->adcName,
622 "chunks.dat", adccntl->taskNumber);
623 strcat(adccntl->chunksFileName, id);
624 adccntl->fileOfChunks = AdcFileOpen(adccntl->chunksFileName,"wb+");
625
626 AdcFileName(adccntl->groupbyFileName, adccntl->adcName,
627 "groupby.dat", adccntl->taskNumber);
628 strcat(adccntl->groupbyFileName, id);
629 adccntl->groupbyFile = AdcFileOpen(adccntl->groupbyFileName,"wb+");
630
631 AdcFileName(adccntl->adcViewSizesFileName, adccntl->adcName,
632 "view.sz", adcpp->ndid);
633 adccntl->adcViewSizesFile = AdcFileOpen(adccntl->adcViewSizesFileName,"r");
634 if(!adccntl->adcViewSizesFile){
635 adccntl->retCode = ADC_FILE_OPEN_FAILURE;
636 return(adccntl);
637 }
638
639 AdcFileName(adccntl->viewSizesFileName, adccntl->adcName,
640 "viewsz.dat", adccntl->taskNumber);
641 strcat(adccntl->viewSizesFileName, id);
642 adccntl->viewSizesFile = AdcFileOpen(adccntl->viewSizesFileName, "wb+");
643
644 adccntl->chunksParams = (CHUNKS*) malloc(MAX_NUM_OF_CHUNKS*sizeof(CHUNKS));
645 if(adccntl->chunksParams==NULL){
646 fprintf(adccntl->logf,"NewAdcViewCntl: Cannot allocate 'chunksParsms'\n");
647 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
648 return(adccntl);
649 }
650 adccntl->memPool = (unsigned char*) malloc(adccntl->memoryLimit);
651 if(adccntl->memPool == NULL ){
652 fprintf(adccntl->logf,
653 "NewAdcViewCntl: Cannot allocate 'main memory pool'\n");
654 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
655 return(adccntl);
656 }
657
658#ifdef IN_CORE
659 /* add a condition to allocate this memory buffer, THIS is IMPORTANT */
660 ux = 4*adccntl->nTopDims + 8*adccntl->nm;
661 if (adccntl->nTopDims%8) ux += 4;
662 adccntl->inpDataBuffer = (uint32*) malloc(adccntl->nInputRecs*ux);
663 if(adccntl->inpDataBuffer == NULL ){
664 fprintf(adccntl->logf,
665 "NewAdcViewCntl: Cannot allocate 'input data buffer'\n");
666 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
667 return(adccntl);
668 }
669#endif
670 adccntl->numberOfChunks = 0;
671
672 for ( i = 0; i < adccntl->nm; i++ ){
673 adccntl->mSums[i] = 0;
674 adccntl->checksums[i] = 0;
675 adccntl->totchs[i] = 0;
676 }
677 adccntl->tree = CreateEmptyTree(adccntl->nd, adccntl->nm,
678 adccntl->memoryLimit, adccntl->memPool);
679 if(!adccntl->tree){
680 fprintf(adccntl->logf,"\nNewAdcViewCntl.CreateEmptyTree failed.\n");
681 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
682 return(adccntl);
683 }
684
685 adccntl->nv = adcpp->nd; /* default */
686 for ( i = 0; i < adccntl->nv; i++ ) adccntl->selection[i]=i+1;
687
688 adccntl->nViewLimit = (1<<adcpp->nd)-1;
689 adccntl->jpp=(JOB_POOL *) malloc((adccntl->nViewLimit+1)*sizeof(JOB_POOL));
690 if ( adccntl->jpp == NULL){
691 fprintf(adccntl->logf,
692 "\n Not enough space to allocate %ld byte for a job pool.",
693 (long)(adccntl->nViewLimit+1)*sizeof(JOB_POOL));
694 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
695 return(adccntl);
696 }
697 adccntl->lpp = (LAYER * ) malloc( (adcpp->nd+1)*sizeof(LAYER));
698 if ( adccntl->lpp == NULL){
699 fprintf(adccntl->logf,
700 "\n Not enough space to allocate %ld byte for a layer reference array.",
701 (long)(adcpp->nd+1)*sizeof(LAYER));
702 adccntl->retCode = ADC_MEMORY_ALLOCATION_FAILURE;
703 return(adccntl);
704 }
705
706 for ( j = 1, i = 1; i <= adcpp->nd; i++ ) {
707 k = NumOfCombsFromNbyK ( adcpp->nd, i );
708 adccntl->lpp[i].layerIndex = j;
709 j += k;
710 adccntl->lpp[i].layerQuantityLimit = k;
711 adccntl->lpp[i].layerCurrentPopulation = 0;
712 }
713
714 JobPoolInit ( adccntl->jpp, (adccntl->nViewLimit+1), adcpp->nd );
715
716 fprintf(adccntl->logf,"\nMeaning of the log file colums is as follows:\n");
717 fprintf(adccntl->logf,
718 "Row Number | Groupby | View Size | Measure Sums | Number of Chunks\n");
719
720 adccntl->verificationFailed = 1;
721 return adccntl;
722}
723void InitAdcViewCntl(ADC_VIEW_CNTL *adccntl,
724 uint32 nSelectedDims,
725 uint32 *selection,
726 uint32 fromParent ){
727 uint32 i;
728
729 adccntl->nv = nSelectedDims;
730
731 for (i = 0; i < adccntl->nm; i++ ) adccntl->mSums[i] = 0;
732 for (i = 0; i < adccntl->nv; i++ ) adccntl->selection[i] = selection[i];
733
734 adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
735 adccntl->numberOfChunks = 0;
736 adccntl->fromParent = fromParent;
737 adccntl->nViewRows = 0;
738
739 if(fromParent){
740 adccntl->nd = adccntl->smallestParentLevel;
741 FSEEK(adccntl->viewFile, adccntl->viewOffset, SEEK_SET);
742 adccntl->nRowsToRead = adccntl->nParentViewRows;
743 }else{
744 adccntl->nd = adccntl->nTopDims;
745 adccntl->nRowsToRead = adccntl->nInputRecs;
746 }
747 adccntl->inpRecSize = GetRecSize(adccntl->nd,adccntl->nm);
748 adccntl->outRecSize = GetRecSize(adccntl->nv,adccntl->nm);
749}
750int32 CloseAdcView(ADC_VIEW_CNTL *adccntl){
751 if (adccntl->inpf) fclose(adccntl->inpf);
752 if (adccntl->viewFile) fclose(adccntl->viewFile);
753 if (adccntl->fileOfChunks) fclose(adccntl->fileOfChunks);
754 if (adccntl->groupbyFile) fclose(adccntl->groupbyFile);
755 if (adccntl->adcViewSizesFile) fclose(adccntl->adcViewSizesFile);
756 if (adccntl->viewSizesFile) fclose(adccntl->viewSizesFile);
757
758 if (DeleteOneFile(adccntl->chunksFileName))
759 return ADC_FILE_DELETE_FAILURE;
760 if (DeleteOneFile(adccntl->viewSizesFileName))
761 return ADC_FILE_DELETE_FAILURE;
762
763 if (DeleteOneFile(adccntl->groupbyFileName))
764 return ADC_FILE_DELETE_FAILURE;
765
766 if (adccntl->chunksParams){
767 free(adccntl->chunksParams);
768 adccntl->chunksParams=NULL;
769 }
770 if (adccntl->memPool){ free(adccntl->memPool); adccntl->memPool=NULL;}
771 if (adccntl->jpp){ free(adccntl->jpp); adccntl->jpp=NULL; }
772 if (adccntl->lpp){ free(adccntl->lpp); adccntl->lpp=NULL; }
773
774 if (adccntl->logf) fclose(adccntl->logf);
775 free(adccntl);
776 return ADC_OK;
777}
778void AdcCntlLog(ADC_VIEW_CNTL *adccntlp){
779 fprintf(adccntlp->logf," memoryLimit = %20d\n",
780 adccntlp->memoryLimit);
781 fprintf(adccntlp->logf," treeNodeSize = %20d\n",
782 adccntlp->tree->treeNodeSize);
783 fprintf(adccntlp->logf," treeMemoryLimit = %20d\n",
784 adccntlp->tree->memoryLimit);
785 fprintf(adccntlp->logf," nNodesLimit = %20d\n",
786 adccntlp->tree->nNodesLimit);
787 fprintf(adccntlp->logf,"freeNodeCounter = %20d\n",
788 adccntlp->tree->freeNodeCounter);
789 fprintf(adccntlp->logf," nViewRows = %20d\n",
790 adccntlp->nViewRows);
791}
792int32 ViewSizesVerification(ADC_VIEW_CNTL *adccntlp){
793 char inps[MAX_PARAM_LINE_SIZE];
794 char msg[64];
795 uint32 *viewCounts;
796 uint32 selection_viewSize[2];
797 uint32 sz;
798 uint32 sel[64];
799 uint32 i;
800 uint32 k;
801 uint64 tx;
802 uint32 iTx;
803
804 viewCounts = (uint32 *) &adccntlp->memPool[0];
805 for ( i = 0; i <= adccntlp->nViewLimit; i++) viewCounts[i] = 0;
806
807 FSEEK(adccntlp->viewSizesFile, 0L, SEEK_SET);
808 FSEEK(adccntlp->adcViewSizesFile, 0L, SEEK_SET);
809
810 while(fread(selection_viewSize, 8, 1, adccntlp->viewSizesFile)){
811 viewCounts[selection_viewSize[0]] = selection_viewSize[1];
812 }
813 k = 0;
814 while ( fscanf(adccntlp->adcViewSizesFile, "%s", inps) != EOF ){
815 if ( strcmp(inps, "Selection:") == 0 ) {
816 while ( fscanf(adccntlp->adcViewSizesFile, "%s", inps)) {
817 if ( strcmp(inps, "View") == 0 ) break;
818 sel[k++] = atoi(inps);
819 }
820 }
821
822 if ( strcmp(inps, "Size:") == 0 ) {
823 fscanf(adccntlp->adcViewSizesFile, "%s", inps);
824 sz = atoi(inps);
825 CreateBinTuple(&tx, sel, k);
826 iTx = (int32)(tx>>(64-adccntlp->nTopDims));
827 adccntlp->verificationFailed = 0;
828 if (!adccntlp->numberOfMadeViews) adccntlp->verificationFailed = 1;
829
830 if ( viewCounts[iTx] != 0){
831 if (viewCounts[iTx] != sz) {
832 if (viewCounts[iTx] != adccntlp->nInputRecs){
833 fprintf(adccntlp->logf,
834 "A view size is wrong: genSz=%d calcSz=%d\n",
835 sz, viewCounts[iTx]);
836 adccntlp->verificationFailed = 1;
837 return ADC_VERIFICATION_FAILED;
838 }
839 }
840 }
841 k = 0;
842 }
843 } /* of while() */
844
845 fprintf(adccntlp->logf,
846 "\n\nMeaning of the log file colums is as follows:\n");
847 fprintf(adccntlp->logf,
848 "Row Number | Groupby | View Size | Measure Sums | Number of Chunks\n");
849
850 if (!adccntlp->verificationFailed)
851 strcpy(msg, "Verification=passed");
852 else strcpy(msg, "Verification=failed");
853 FSEEK(adccntlp->logf, 0L, SEEK_SET);
854 fprintf(adccntlp->logf, msg);
855 FSEEK(adccntlp->logf, 0L, SEEK_END);
856 FSEEK(adccntlp->viewSizesFile, 0L, SEEK_SET);
857 return ADC_OK;
858}
859int32 ComputeGivenGroupbys(ADC_VIEW_CNTL *adccntlp){
860 int32 retCode;
861 uint32 i;
862 uint64 binRepTuple;
863 uint32 ut32;
864 uint32 nViews = 0;
865 uint32 nSelectedDims;
866 uint32 smp;
867#ifdef IN_CORE
868 uint32 firstView = 1;
869#endif
870 uint32 selection_viewsize[2];
871 char ttout[16];
872
873 while (fread(&binRepTuple, 8, 1, adccntlp->groupbyFile )){
874 for(i = 0; i < adccntlp->nm; i++) adccntlp->checksums[i]=0;
875 nViews++;
876 swap8(&binRepTuple);
877
878 GetRegTupleFromBin64(binRepTuple, adccntlp->selection,
879 adccntlp->nTopDims, &nSelectedDims);
880 ut32 = (uint32)(binRepTuple>>(64-adccntlp->nTopDims));
881 selection_viewsize[0] = ut32;
882 ut32 <<= (32-adccntlp->nTopDims);
883 adccntlp->groupby = ut32;
884#ifndef IN_CORE
885 smp = GetParent(adccntlp, ut32);
886#endif
887#ifdef IN_CORE
888 if (firstView) {
889 firstView = 0;
890 if(ReadWholeInputData(adccntlp, adccntlp->inpf)) {
891 fprintf(stderr, "ReadWholeInputData failed.\n");
892 return ADC_INTERNAL_ERROR;
893 }
894 }
895 smp = noneParent;
896#endif
897
898 if (smp != noneParent)
899 GetRegTupleFromParent(binRepTuple,
900 adccntlp->parBinRepTuple,
901 adccntlp->selection,
902 adccntlp->nTopDims);
903 InitAdcViewCntl(adccntlp, nSelectedDims,
904 adccntlp->selection, (smp == noneParent)?0:1);
905#ifdef IN_CORE
906 if(retCode = ComputeMemoryFittedView(adccntlp)) {
907 fprintf(stderr, "ComputeMemoryFittedView failed.\n");
908 return retCode;
909 }
910#else
911#ifdef OPTIMIZATION
912 if (smp == prefixedParent){
913 if (retCode = PrefixedAggregate(adccntlp, adccntlp->viewFile)) {
914 fprintf(stderr,
915 "ComputeGivenGroupbys.PrefixedAggregate failed.\n");
916 return retCode;
917 }
918 adccntlp->numberOfPrefixedGroupbys++;
919 }else if (smp == sharedSortParent) {
920 if (retCode = SharedSortAggregate(adccntlp)) {
921 fprintf(stderr,
922 "ComputeGivenGroupbys.SharedSortAggregate failed.\n");
923 return retCode;
924 }
925 adccntlp->numberOfSharedSortGroupbys++;
926 }else
927#endif /* OPTIMIZATION */
928 {
929 if( smp != noneParent ) {
930 retCode = RunFormation(adccntlp, adccntlp->viewFile);
931 if(retCode!=ADC_OK){
932 fprintf(stderr,
933 "ComputrGivenGroupbys.RunFormation failed.\n");
934 return retCode;
935 }
936 }else{
937 if ((retCode=RunFormation (adccntlp, adccntlp->inpf)) != ADC_OK){
938 fprintf(stderr,
939 "ComputrGivenGroupbys.RunFormation failed.\n");
940 return retCode;
941 }
942 adccntlp->numberOfViewsMadeFromInput++;
943 }
944 if(!adccntlp->numberOfChunks){
945 uint64 ordern=0;
946 adccntlp->nViewRows = adccntlp->tree->count;
947 adccntlp->totalOfViewRows += adccntlp->nViewRows;
948 retCode=WriteViewToDiskCS(adccntlp,adccntlp->tree->root.left,&ordern);
949 if(retCode!=ADC_OK){
950 fprintf(stderr,
951 "ComputeGivenGroupbys.WriteViewToDisk: Write error.\n");
952 return ADC_WRITE_FAILED;
953 }
954 }else {
955 retCode=MultiWayMerge(adccntlp);
956 if(retCode!=ADC_OK) {
957 fprintf(stderr,"ComputeGivenGroupbys.MultiWayMerge failed.\n");
958 return retCode;
959 }
960 }
961 }
962
963 JobPoolUpdate(adccntlp);
964
965 adccntlp->accViewFileOffset +=
966 (int64)(adccntlp->nViewRows*adccntlp->outRecSize);
967 FSEEK(adccntlp->fileOfChunks, 0L, SEEK_SET);
968 FSEEK(adccntlp->inpf, 0L, SEEK_SET);
969#endif /* IN_CORE */
970 for( i = 0; i < adccntlp->nm; i++)
971 adccntlp->totchs[i]+=adccntlp->checksums[i];
972 selection_viewsize[1] = adccntlp->nViewRows;
973 fwrite(selection_viewsize, 8, 1, adccntlp->viewSizesFile);
974 adccntlp->totalViewFileSize +=
975 adccntlp->outRecSize*adccntlp->nViewRows;
976 sprintf(ttout, "%7d ", nViews);
977 WriteOne32Tuple(ttout, adccntlp->groupby,
978 adccntlp->nTopDims, adccntlp->logf);
979 fprintf(adccntlp->logf, " | %15d | ", adccntlp->nViewRows);
980 for ( i = 0; i < adccntlp->nm; i++ ){
981 fprintf(adccntlp->logf, " %20lld", adccntlp->checksums[i]);
982 }
983 fprintf(adccntlp->logf, " | %5d", adccntlp->numberOfChunks);
984 }
985 adccntlp->numberOfMadeViews = nViews;
986 if(ViewSizesVerification(adccntlp)) return ADC_VERIFICATION_FAILED;
987 return ADC_OK;
988}
Note: See TracBrowser for help on using the repository browser.