Determinazione dell’elemento minimo e della sua posizione in ciascuna colonna della matrice con CUDA Thrust

Ho un problema abbastanza semplice, ma non riesco a capire una soluzione elegante ad esso.

Ho un codice Thrust che produce c vettori di stesse dimensioni contenenti valori. Diciamo che ognuno di questi vettori ha un indice. Mi piacerebbe che ogni posizione del vettore ottenga l’indice del vettore c per il quale il valore è il più basso:

Esempio:

 C0 = (0,10,20,3,40) C1 = (1,2 ,3 ,5,10) 

Avrei ottenuto come risultato un vettore contenente l’indice del vettore C che ha il valore più basso:

 result = (0,1 ,1 ,0,1) 

Ho pensato di farlo usando gli iteratori thrust zip, ma ho incontrato problemi: potevo comprimere tutti i vettori c e implementare una trasformazione arbitraria che prende una tupla e restituisce l’indice del suo valore più basso, ma:

  1. Come scorrere il contenuto di una tupla?
  2. Come ho capito, le tuple possono solo memorizzare fino a 10 elementi e possono esserci molti più di 10 vettori c .

Ho quindi pensato di farlo in questo modo: invece di avere c vettori separati, aggiungili tutti in un singolo vettore C , quindi generi chiavi che fanno riferimento alle posizioni ed esegui un ordinamento stabile per chiave che raggrupperà le voci vettoriali da una stessa posizione insieme . Nell’esempio che darebbe:

 C = (0,10,20,3,40,1,2,3,5,10) keys = (0,1 ,2 ,3,4 ,0,1,2,3,4 ) after stable sort by key: output = (0,1,10,2,20,3,3,5,40,10) keys = (0,0,1 ,1,2 ,2,3,3,4 ,4 ) 

Quindi generare le chiavi con le posizioni nel vettore, comprimere l’output con l’indice dei vettori c e quindi eseguire una riduzione per tasto con un functor personalizzato che per ciascuna riduzione emette l’indice con il valore più basso. Nell’esempio:

 input = (0,1,10,2,20,3,3,5,40,10) indexes= (0,1,0 ,1,0 ,1,0,1,0 ,1) keys = (0,0,1 ,1,2 ,2,3,3,4 ,4) after reduce by keys on zipped input and indexes: output = (0,1,1,0,1) 

Tuttavia, come scrivere tale functor per ridurre l’operazione con i tasti?

Poiché la lunghezza dei tuoi vettori deve essere la stessa. È meglio concatenarli insieme e trattarli come una matrice C.

Quindi il tuo problema diventa trovare gli indici dell’elemento min di ogni colonna in una matrice a righe principali. Può essere risolto come segue.

  1. cambia la riga-maggiore in col-maggiore;
  2. trova gli indici per ogni colonna.

Nel passo 1, hai proposto di usare stable_sort_by_key per riorganizzare l’ordine degli elementi, che non è un metodo efficace. Poiché il riarrangiamento può essere calcolato direttamente dato il #row e #col della matrice. In sintesi, può essere fatto con iteratori di permutazione come:

 thrust::make_permutation_iterator( c.begin(), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), (_1 % row) * col + _1 / row) ) 

Nel passaggio 2, reduce_by_key può fare esattamente quello che vuoi. Nel tuo caso il funtore binario-riduzione è facile, poiché il confronto su tupla (elemento del tuo vettore zippato) è già stato definito per confrontare il 1 ° elemento della tupla, ed è supportato dalla spinta come

 thrust::minimum< thrust::tuple >() 

L’intero programma è mostrato come segue. È richiesto Thrust 1.6.0+ poiché utilizzo i segnaposti in fantasiosi iteratori.

 #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  using namespace thrust::placeholders; int main() { const int row = 2; const int col = 5; float initc[] = { 0, 10, 20, 3, 40, 1, 2, 3, 5, 10 }; thrust::device_vector c(initc, initc + row * col); thrust::device_vector minval(col); thrust::device_vector minidx(col); thrust::reduce_by_key( thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 / row), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 / row) + row * col, thrust::make_zip_iterator( thrust::make_tuple( thrust::make_permutation_iterator( c.begin(), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), (_1 % row) * col + _1 / row)), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 % row))), thrust::make_discard_iterator(), thrust::make_zip_iterator( thrust::make_tuple( minval.begin(), minidx.begin())), thrust::equal_to(), thrust::minimum >() ); std::copy(minidx.begin(), minidx.end(), std::ostream_iterator(std::cout, " ")); std::cout << std::endl; return 0; } 

Due problemi rimanenti possono influire sulle prestazioni.

  1. i valori minimi devono essere emessi, che non è richiesto;
  2. reduce_by_key è progettato per segmenti con lunghezze varianti, potrebbe non essere l'algoritmo più veloce per la riduzione su segmenti con stessa lunghezza.

Scrivere il proprio kernel potrebbe essere la soluzione migliore per le massime prestazioni.

Una ansible idea, basandoci sull’idea del tipo vettoriale

  1. Supponiamo di avere vettori come questo:

     values: C = ( 0,10,20, 3,40, 1, 2, 3, 5,10) keys: K = ( 0, 1, 2, 3, 4, 0, 1, 2, 3, 4) segments: S = ( 0, 0, 0, 0, 0, 1, 1, 1, 1, 1) 
  2. zip insieme K e S per creare KS

  3. stable_sort_by_key che utilizza C come chiavi e KS come valori:

     stable_sort_by_key(C.begin(), C.end(), KS_begin); 
  4. zip insieme i vettori C e K riordinati, per creare CK

  5. stable_sort_by_key utilizzando la S riordinata come chiavi e CK come i valori:

     stable_sort_by_key(S.begin(), S.end(), CK_begin); 
  6. utilizzare un iteratore di permutazione o un iteratore di intervallo per accedere ad ogni elemento Nth (0, N, 2N, …) del vettore K appena riordinato, per recuperare un vettore degli indici dell’elemento min in ogni segmento, dove N è la lunghezza dei segmenti.

Non l’ho ancora implementato, adesso è solo un’idea. Forse non funzionerà per qualche motivo che non ho ancora osservato.

segments ( S ) e i keys ( K ) sono effettivamente indici di righe e colonne.

E la tua domanda mi sembra strana, perché il tuo titolo menziona “trova indice di valore massimo” ma la maggior parte della tua domanda sembra riferirsi al “valore più basso”. Indipendentemente da ciò, con una modifica al passaggio 6 del mio algoritmo, è ansible trovare entrambi i valori.

Ho avuto la curiosità di testare quale degli approcci precedenti fosse più veloce. Quindi, ho implementato l’idea di Robert Crovella nel codice sottostante che riporta, per completezza, anche l’approccio di Eric.

 #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  #include "TimingGPU.cuh" using namespace thrust::placeholders; template  class strided_range { public: typedef typename thrust::iterator_difference::type difference_type; struct stride_functor : public thrust::unary_function { difference_type stride; stride_functor(difference_type stride) : stride(stride) {} __host__ __device__ difference_type operator()(const difference_type& i) const { return stride * i; } }; typedef typename thrust::counting_iterator CountingIterator; typedef typename thrust::transform_iterator TransformIterator; typedef typename thrust::permutation_iterator PermutationIterator; // type of the strided_range iterator typedef PermutationIterator iterator; // construct strided_range for the range [first,last) strided_range(Iterator first, Iterator last, difference_type stride) : first(first), last(last), stride(stride) {} iterator begin(void) const { return PermutationIterator(first, TransformIterator(CountingIterator(0), stride_functor(stride))); } iterator end(void) const { return begin() + ((last - first) + (stride - 1)) / stride; } protected: Iterator first; Iterator last; difference_type stride; }; /**************************************************************/ /* CONVERT LINEAR INDEX TO ROW INDEX - NEEDED FOR APPROACH #1 */ /**************************************************************/ template< typename T > struct mod_functor { __host__ __device__ T operator()(T a, T b) { return a % b; } }; /********/ /* MAIN */ /********/ int main() { /***********************/ /* SETTING THE PROBLEM */ /***********************/ const int Nrows = 200; const int Ncols = 200; // --- Random uniform integer distribution between 10 and 99 thrust::default_random_engine rng; thrust::uniform_int_distribution dist(10, 99); // --- Matrix allocation and initialization thrust::device_vector d_matrix(Nrows * Ncols); for (size_t i = 0; i < d_matrix.size(); i++) d_matrix[i] = (float)dist(rng); TimingGPU timerGPU; /******************/ /* APPROACH NR. 1 */ /******************/ timerGPU.StartCounter(); thrust::device_vector d_min_values(Ncols); thrust::device_vector d_min_indices_1(Ncols); thrust::reduce_by_key( thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 / Nrows), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 / Nrows) + Nrows * Ncols, thrust::make_zip_iterator( thrust::make_tuple( thrust::make_permutation_iterator( d_matrix.begin(), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), (_1 % Nrows) * Ncols + _1 / Nrows)), thrust::make_transform_iterator( thrust::make_counting_iterator((int) 0), _1 % Nrows))), thrust::make_discard_iterator(), thrust::make_zip_iterator( thrust::make_tuple( d_min_values.begin(), d_min_indices_1.begin())), thrust::equal_to(), thrust::minimum >() ); printf("Timing for approach #1 = %f\n", timerGPU.GetCounter()); /******************/ /* APPROACH NR. 2 */ /******************/ timerGPU.StartCounter(); // --- Computing row indices vector thrust::device_vector d_row_indices(Nrows * Ncols); thrust::transform(thrust::make_counting_iterator(0), thrust::make_counting_iterator(Nrows * Ncols), thrust::make_constant_iterator(Ncols), d_row_indices.begin(), thrust::divides() ); // --- Computing column indices vector thrust::device_vector d_column_indices(Nrows * Ncols); thrust::transform(thrust::make_counting_iterator(0), thrust::make_counting_iterator(Nrows * Ncols), thrust::make_constant_iterator(Ncols), d_column_indices.begin(), mod_functor()); // --- int and float iterators typedef thrust::device_vector::iterator IntIterator; typedef thrust::device_vector::iterator FloatIterator; // --- Relevant tuples of int and float iterators typedef thrust::tuple IteratorTuple1; typedef thrust::tuple IteratorTuple2; // --- zip_iterator of the relevant tuples typedef thrust::zip_iterator ZipIterator1; typedef thrust::zip_iterator ZipIterator2; // --- zip_iterator creation ZipIterator1 iter1(thrust::make_tuple(d_column_indices.begin(), d_row_indices.begin())); thrust::stable_sort_by_key(d_matrix.begin(), d_matrix.end(), iter1); ZipIterator2 iter2(thrust::make_tuple(d_matrix.begin(), d_row_indices.begin())); thrust::stable_sort_by_key(d_column_indices.begin(), d_column_indices.end(), iter2); typedef thrust::device_vector::iterator Iterator; // --- Strided access to the sorted array strided_range d_min_indices_2(d_row_indices.begin(), d_row_indices.end(), Nrows); printf("Timing for approach #2 = %f\n", timerGPU.GetCounter()); printf("\n\n"); std::copy(d_min_indices_2.begin(), d_min_indices_2.end(), std::ostream_iterator(std::cout, " ")); std::cout << std::endl; return 0; } 

Testando i due approcci per il caso di matrici di dimensioni 2000x2000 , questo è stato il risultato su una scheda Kepler K20c:

 Eric's : 8.4s Robert Crovella's : 33.4s