View Javadoc

1   package eu.scape_project.watch.monitor;
2   
3   import java.util.ArrayList;
4   import java.util.Collection;
5   import java.util.Collections;
6   import java.util.List;
7   
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import thewebsemantic.binding.RdfBean;
12  import eu.scape_project.watch.dao.AsyncRequestDAO;
13  import eu.scape_project.watch.dao.DOListener;
14  import eu.scape_project.watch.dao.PropertyValueDAO;
15  import eu.scape_project.watch.domain.AsyncRequest;
16  import eu.scape_project.watch.domain.Notification;
17  import eu.scape_project.watch.domain.PropertyValue;
18  import eu.scape_project.watch.domain.Question;
19  import eu.scape_project.watch.domain.Trigger;
20  import eu.scape_project.watch.interfaces.MonitorInterface;
21  import eu.scape_project.watch.notification.NotificationService;
22  
23  public class CentralMonitor implements DOListener {
24  
25    private static final Logger LOG = LoggerFactory.getLogger(CentralMonitor.class);
26  
27    private List<MonitorInterface> monitors;
28  
29    private List<AsyncRequest> aRequests;
30  
31    private AsyncRequestDAO asyncRequestDAO;
32    
33    private NotificationService nService; 
34  
35    public CentralMonitor() {
36      monitors = new ArrayList<MonitorInterface>();
37      aRequests = new ArrayList<AsyncRequest>();
38      LOG.info("CentralMonitor initialized");
39    }
40  
41    public void addMonitor(MonitorInterface monitor) {
42      monitors.add(monitor);
43      monitor.registerCentralMonitor(this);
44    }
45  
46    public void registerToAsyncRequest(AsyncRequestDAO ar) {
47      asyncRequestDAO = ar;
48      asyncRequestDAO.addDOListener(this);
49      LOG.debug("CentralMonitor listening AsyncRequestDAO");
50    }
51  
52    public void setNotificationService(NotificationService ns){
53      nService = ns;
54    }
55    
56    public NotificationService getNotificationService() {
57      return nService;
58    }
59    
60    public void notifyAsyncRequests(List<String> ids) {
61  
62      for (String uuid : ids) {
63        AsyncRequest tmp = findAsyncRequest(uuid);
64        assessRequest(tmp);
65      }
66      
67    }
68  
69    public Collection<AsyncRequest> getAllRequests() {
70      return Collections.unmodifiableCollection(aRequests);
71    }
72    
73    public Collection<MonitorInterface> getAllMonitors() {
74      return Collections.unmodifiableCollection(monitors);
75    }
76    
77    
78    @Override
79    public void onUpdated(RdfBean object) {
80      AsyncRequest req = (AsyncRequest) object;
81      LOG.debug("adding Request to monitors " + req.getId());
82      if (!aRequests.contains(req)) {
83        aRequests.add(req);
84        for (MonitorInterface monitor : monitors) {
85          monitor.addWatchRequest(req);
86        }
87      }
88    }
89  
90    @Override
91    public void onRemoved(RdfBean object) {
92      aRequests.remove(object);
93      //TODO notify monitors about removal
94    }
95  
96    
97    
98    
99    
100   private AsyncRequest findAsyncRequest(String uuid) {
101 
102     for (AsyncRequest i : aRequests) {
103       if (i.getId().equals(uuid))
104         return i;
105     }
106 
107     return null;
108 
109   }
110 
111   private void assessRequest(AsyncRequest aRequest) {
112 
113     LOG.info("Assessing AsyncRequest " + aRequest.getId());
114     
115     for (Trigger trigger : aRequest.getTriggers()) {
116       Question question = trigger.getQuestion();
117       List<PropertyValue> result = PropertyValueDAO.getInstance().query(question.getSparql(), 0, 10);
118       if (result.size()>0){
119         notify(trigger);
120       }else {
121         LOG.info("Condition is not satisfied");
122       }
123     }
124       
125   }
126 
127   private void notify(Trigger trigger) {
128     if (nService!=null) {
129       for (Notification notification: trigger.getNotifications()) {
130         nService.send(notification);
131       }
132     }else {
133       LOG.warn("No NotificationService specified");
134     }
135   }
136 }