LeSpotted44 LeSpotted44 - 24 days ago 16
Java Question

Multithread foreach hashmap loop

I am storing data retrieved with the help of Jsoup ... And submit them to my own http api ...

The problem : How i can foreach my Hashmap with multithreads , without each thread treat the same value of my Hashmap like it is the case currently

Actually:

Thread1 : a
Thread2 : a
Thread3 : a
Thread4 : a

Thread1 : b
Thread2 : b
Thread3 : b
Thread4 : b

I want something like this

Thread1 : a
Thread2 : b
Thread3 : c
Thread4 : d
...

package ygg.org;

import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

public class Filmstreaming1 {

final static int NB_PAGE = 2;
final static int THREADS = 1;
static ConcurrentHashMap<String, String> movies_list = new ConcurrentHashMap<>();
static int count = 0;

static void Initialize(){

System.out.println("----------------------------------");
System.out.println("Homer is starting...");
System.out.println("------------------------------");

for(int i = 1 ; i <= NB_PAGE ; i++){
try{

Document page = Jsoup.connect("http://xxxxxxx.com/page/" + i + "/")
.userAgent("Mozilla")
.timeout(3000)
.post();
Elements movies = page.getElementsByClass("margin-b40").get(0).getElementsByClass("short-link").select("a");
for(Element movie : movies){
String href = movie.attr("href");
String movie_title = movie.text().replaceAll("\\(.*\\)", "");
boolean isMovieExists = movies_list.containsKey(href);
if(isMovieExists == false){
movies_list.put(href, movie_title);
System.out.println("Ajout du film " + movie_title);
}
}
System.out.println("Total récupérés " + movies_list.size() + " page : " + i);


} catch(IOException ioe){
System.out.println("Exception: " + ioe);
}



}
try{
for(int i = 0; i <= THREADS; i++){
Thread api = new ThreadApi();
api.start();
}
}catch(Exception e){
System.out.println("Exception: " + e.getMessage());
}
}

}

class ThreadApi extends Thread{

public void run(){
while(true){

Filmstreaming1.movies_list.forEach((key, value) -> {

try{

String code = key.substring(key.indexOf("com/") + 4, key.indexOf("-"));

Document page = Jsoup.connect("http://xxxxxxx.com/" + code + "--.html")
.userAgent("Mozilla")
.timeout(3000)
.post();

String director = page.getElementsByClass("finfo-text").get(5).text().toString();



Document page1 = Jsoup.connect("http://xxxxxxx.com/play.php?newsid=" + code + "&vt=ol&sr=3")
.referrer("http://xxxxxxx.com/" + code + "--.html")
.userAgent("Mozilla")
.timeout(3000)
.post();


String link = page1.getElementsByTag("iframe").first().attr("src").toString();

String encoded_title = URLEncoder.encode((String) value, "UTF-8");
String encoded_director = URLEncoder.encode((String) director, "UTF-8");

String url = "http://xxxxxxx.com/api/movie?movie=" + encoded_title + "&director=" + encoded_director;

// On affiche l'url
System.out.println(url);

Document api = Jsoup.connect(url)
.userAgent("Mozilla")
.timeout(3000)
.get();

String response = api.text();
System.out.println(response);
if(response == "-1"){
System.out.println("Erreur");

} else {

url = "http://xxxxxxx.com/api/video?link=" + link + "&ref=" + response + "&version=vf";

Document submit = Jsoup.connect(url)
.userAgent("Mozilla")
.timeout(3000)
.get();

response = submit.text();

Filmstreaming1.movies_list.remove(key);
System.out.println(response);

}
}catch(Exception e){
System.out.println("Exception " + e.getMessage());
}

});

}
}

}

Answer

As your Map is already a ConcurrentHashMap, you could just use ConcurrentHashMap.forEach - this allows to configure a paralleslismThreshold which may automatically execute the invocations in parallel if the threshold is exceeded.

The Docs have the following to say about the effects of the threshold parameter:

These bulk operations accept a parallelismThreshold argument. Methods proceed sequentially if the current map size is estimated to be less than the given threshold. Using a value of Long.MAX_VALUE suppresses all parallelism. Using a value of 1 results in maximal parallelism by partitioning into enough subtasks to fully utilize the ForkJoinPool.commonPool() that is used for all parallel computations. Normally, you would initially choose one of these extreme values, and then measure performance of using in-between values that trade off overhead versus throughput.

So no need for creating your own Thread or even Runnable-implementation, any method reference or lambda that acts as a BiConsumer<? super K,? super V> will do.

Comments