1 package eu.scape_project.watch.monitor;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Collections;
6 import java.util.List;
7
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import thewebsemantic.binding.RdfBean;
12 import eu.scape_project.watch.dao.AsyncRequestDAO;
13 import eu.scape_project.watch.dao.DOListener;
14 import eu.scape_project.watch.dao.PropertyValueDAO;
15 import eu.scape_project.watch.domain.AsyncRequest;
16 import eu.scape_project.watch.domain.Notification;
17 import eu.scape_project.watch.domain.PropertyValue;
18 import eu.scape_project.watch.domain.Question;
19 import eu.scape_project.watch.domain.Trigger;
20 import eu.scape_project.watch.interfaces.MonitorInterface;
21 import eu.scape_project.watch.notification.NotificationService;
22
23 public class CentralMonitor implements DOListener {
24
25 private static final Logger LOG = LoggerFactory.getLogger(CentralMonitor.class);
26
27 private List<MonitorInterface> monitors;
28
29 private List<AsyncRequest> aRequests;
30
31 private AsyncRequestDAO asyncRequestDAO;
32
33 private NotificationService nService;
34
35 public CentralMonitor() {
36 monitors = new ArrayList<MonitorInterface>();
37 aRequests = new ArrayList<AsyncRequest>();
38 LOG.info("CentralMonitor initialized");
39 }
40
41 public void addMonitor(MonitorInterface monitor) {
42 monitors.add(monitor);
43 monitor.registerCentralMonitor(this);
44 }
45
46 public void registerToAsyncRequest(AsyncRequestDAO ar) {
47 asyncRequestDAO = ar;
48 asyncRequestDAO.addDOListener(this);
49 LOG.debug("CentralMonitor listening AsyncRequestDAO");
50 }
51
52 public void setNotificationService(NotificationService ns){
53 nService = ns;
54 }
55
56 public NotificationService getNotificationService() {
57 return nService;
58 }
59
60 public void notifyAsyncRequests(List<String> ids) {
61
62 for (String uuid : ids) {
63 AsyncRequest tmp = findAsyncRequest(uuid);
64 assessRequest(tmp);
65 }
66
67 }
68
69 public Collection<AsyncRequest> getAllRequests() {
70 return Collections.unmodifiableCollection(aRequests);
71 }
72
73 public Collection<MonitorInterface> getAllMonitors() {
74 return Collections.unmodifiableCollection(monitors);
75 }
76
77
78 @Override
79 public void onUpdated(RdfBean object) {
80 AsyncRequest req = (AsyncRequest) object;
81 LOG.debug("adding Request to monitors " + req.getId());
82 if (!aRequests.contains(req)) {
83 aRequests.add(req);
84 for (MonitorInterface monitor : monitors) {
85 monitor.addWatchRequest(req);
86 }
87 }
88 }
89
90 @Override
91 public void onRemoved(RdfBean object) {
92 aRequests.remove(object);
93
94 }
95
96
97
98
99
100 private AsyncRequest findAsyncRequest(String uuid) {
101
102 for (AsyncRequest i : aRequests) {
103 if (i.getId().equals(uuid))
104 return i;
105 }
106
107 return null;
108
109 }
110
111 private void assessRequest(AsyncRequest aRequest) {
112
113 LOG.info("Assessing AsyncRequest " + aRequest.getId());
114
115 for (Trigger trigger : aRequest.getTriggers()) {
116 Question question = trigger.getQuestion();
117 List<PropertyValue> result = PropertyValueDAO.getInstance().query(question.getSparql(), 0, 10);
118 if (result.size()>0){
119 notify(trigger);
120 }else {
121 LOG.info("Condition is not satisfied");
122 }
123 }
124
125 }
126
127 private void notify(Trigger trigger) {
128 if (nService!=null) {
129 for (Notification notification: trigger.getNotifications()) {
130 nService.send(notification);
131 }
132 }else {
133 LOG.warn("No NotificationService specified");
134 }
135 }
136 }